Coverage for src/dirschema/json/validate.py: 98%

51 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-07 09:34 +0000

1"""Helper functions to perform validation of JSON-compatible metadata files.""" 

2 

3from pathlib import Path 

4from typing import Dict, List, Optional, Type, Union 

5 

6from jsonschema import Draft202012Validator 

7 

8from .handler import ValidationHandler 

9from .handlers import loaded_handlers 

10from .parse import load_json, to_uri 

11 

12JSONValidationErrors = Dict[str, List[str]] 

13"""JSON validation errors mapping from JSON Pointers to of error message lists.""" 

14 

15 

16def plugin_from_uri(custom_uri: str) -> ValidationHandler: 

17 """Parse a validation plugin pseudo-URI, return the plugin class and args string.""" 

18 try: 

19 if not custom_uri.startswith("v#"): 

20 raise ValueError 

21 ep, args = custom_uri[2:].split("://") 

22 if ep == "": 

23 raise ValueError 

24 except ValueError: 

25 msg = f"Invalid custom validator plugin pseudo-URI: '{custom_uri}'" 

26 raise ValueError(msg) from None 

27 

28 try: 

29 h: Type[ValidationHandler] = loaded_handlers[ep] 

30 return h(args) 

31 except KeyError: 

32 raise ValueError(f"Validator entry-point not found: '{ep}'") from None 

33 

34 

35def validate_custom(dat, plugin_str: str) -> JSONValidationErrors: 

36 """Perform validation based on a validation handler string.""" 

37 h = plugin_from_uri(plugin_str) 

38 if h._for_json: 

39 return h.validate_json(dat, h.args) 

40 else: 

41 return h.validate_raw(dat, h.args) 

42 

43 

44def validate_jsonschema(dat, schema: Union[bool, Dict]) -> JSONValidationErrors: 

45 """Perform validation of a dict based on a JSON Schema.""" 

46 v = Draft202012Validator(schema=schema) # type: ignore 

47 errs: Dict[str, List[str]] = {} 

48 for verr in sorted(v.iter_errors(dat), key=lambda e: e.path): # type: ignore 

49 key = "/" + "/".join(map(str, verr.path)) # JSON Pointer into document 

50 if key not in errs: 

51 errs[key] = [] 

52 errs[key].append(verr.message) 

53 return errs 

54 

55 

56def resolve_validator( 

57 schema_or_ref: Union[bool, str, Dict], 

58 *, 

59 local_basedir: Optional[Path] = None, 

60 relative_prefix: str = "", 

61) -> Union[bool, Dict, ValidationHandler]: 

62 """Resolve passed object into a schema or validator. 

63 

64 If passed object is already a schema, will return it. 

65 If passed object is a string, will load the referenced schema 

66 or instantiate the custom validator (a string starting with `v#`). 

67 """ 

68 if isinstance(schema_or_ref, bool) or isinstance(schema_or_ref, dict): 

69 # embedded schema 

70 return schema_or_ref 

71 

72 if not schema_or_ref.startswith("v#"): 

73 # load schema from URI 

74 uri = to_uri(schema_or_ref, local_basedir, relative_prefix) 

75 return load_json(uri, local_basedir=local_basedir) 

76 

77 # custom validation, not json schema 

78 return plugin_from_uri(schema_or_ref) 

79 

80 

81def validate_metadata( 

82 dat, 

83 schema: Union[bool, str, Dict, ValidationHandler], 

84 *, 

85 local_basedir: Optional[Path] = None, 

86 relative_prefix: str = "", 

87) -> JSONValidationErrors: 

88 """Validate object (dict or byte stream) using JSON Schema or custom validator. 

89 

90 The validator must be either a JSON Schema dict, or a string 

91 pointing to a JSON Schema, or a custom validator handler string. 

92 

93 Returns a dict mapping from JSON Pointers to a list of errors in that location. 

94 If the dict is empty, no validation errors were detected. 

95 """ 

96 if isinstance(schema, str): 

97 val = resolve_validator( 

98 schema, local_basedir=local_basedir, relative_prefix=relative_prefix 

99 ) 

100 else: 

101 val = schema 

102 

103 if isinstance(val, ValidationHandler): 

104 return val.validate(dat) 

105 else: 

106 return validate_jsonschema(dat, val)