Coverage for src/dirschema/json/validate.py: 98%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2023-05-08 15:24 +0000

1"""Helper functions to perform validation of JSON-compatible metadata files.""" 

2 

3from pathlib import Path 

4from typing import Dict, List, Optional, Type, Union 

5 

6from jsonschema import Draft202012Validator 

7 

8from .handler import ValidationHandler 

9from .handlers import loaded_handlers 

10from .parse import load_json, to_uri 

11 

12JSONValidationErrors = Dict[str, List[str]] 

13"""JSON validation errors mapping from JSON Pointers to of error message lists.""" 

14 

15 

16def plugin_from_uri(custom_uri: str) -> ValidationHandler: 

17 """Parse a validation plugin pseudo-URI, return the plugin class and args string.""" 

18 try: 

19 if not custom_uri.startswith("v#"): 

20 raise ValueError 

21 ep, args = custom_uri[2:].split("://") 

22 if ep == "": 

23 raise ValueError 

24 except ValueError: 

25 raise ValueError(f"Invalid custom validator plugin pseudo-URI: '{custom_uri}'") 

26 

27 try: 

28 h: Type[ValidationHandler] = loaded_handlers[ep] 

29 return h(args) 

30 except KeyError: 

31 raise ValueError(f"Validator entry-point not found: '{ep}'") 

32 

33 

34def validate_custom(dat, plugin_str: str) -> JSONValidationErrors: 

35 """Perform validation based on a validation handler string.""" 

36 h = plugin_from_uri(plugin_str) 

37 if h._for_json: 

38 return h.validate_json(dat, h.args) 

39 else: 

40 return h.validate_raw(dat, h.args) 

41 

42 

43def validate_jsonschema(dat, schema: Union[bool, Dict]) -> JSONValidationErrors: 

44 """Perform validation of a dict based on a JSON Schema.""" 

45 v = Draft202012Validator(schema=schema) # type: ignore 

46 errs: Dict[str, List[str]] = {} 

47 for verr in sorted(v.iter_errors(dat), key=lambda e: e.path): # type: ignore 

48 key = "/" + "/".join(map(str, verr.path)) # JSON Pointer into document 

49 if key not in errs: 

50 errs[key] = [] 

51 errs[key].append(verr.message) 

52 return errs 

53 

54 

55def resolve_validator( 

56 schema_or_ref: Union[bool, str, Dict], 

57 *, 

58 local_basedir: Optional[Path] = None, 

59 relative_prefix: str = "", 

60) -> Union[bool, Dict, ValidationHandler]: 

61 """Resolve passed object into a schema or validator. 

62 

63 If passed object is already a schema, will return it. 

64 If passed object is a string, will load the referenced schema 

65 or instantiate the custom validator (a string starting with `v#`). 

66 """ 

67 if isinstance(schema_or_ref, bool) or isinstance(schema_or_ref, dict): 

68 # embedded schema 

69 return schema_or_ref 

70 

71 if not schema_or_ref.startswith("v#"): 

72 # load schema from URI 

73 uri = to_uri(schema_or_ref, local_basedir, relative_prefix) 

74 return load_json(uri, local_basedir=local_basedir) 

75 

76 # custom validation, not json schema 

77 return plugin_from_uri(schema_or_ref) 

78 

79 

80def validate_metadata( 

81 dat, 

82 schema: Union[bool, str, Dict, ValidationHandler], 

83 *, 

84 local_basedir: Optional[Path] = None, 

85 relative_prefix: str = "", 

86) -> JSONValidationErrors: 

87 """Validate object (dict or byte stream) using JSON Schema or custom validator. 

88 

89 The validator must be either a JSON Schema dict, or a string 

90 pointing to a JSON Schema, or a custom validator handler string. 

91 

92 Returns a dict mapping from JSON Pointers to a list of errors in that location. 

93 If the dict is empty, no validation errors were detected. 

94 """ 

95 if isinstance(schema, str): 

96 val = resolve_validator( 

97 schema, local_basedir=local_basedir, relative_prefix=relative_prefix 

98 ) 

99 else: 

100 val = schema 

101 

102 if isinstance(val, ValidationHandler): 

103 return val.validate(dat) 

104 else: 

105 return validate_jsonschema(dat, val)