Coverage for src/dirschema/json/validate.py: 98%
50 statements
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-08 15:24 +0000
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-08 15:24 +0000
1"""Helper functions to perform validation of JSON-compatible metadata files."""
3from pathlib import Path
4from typing import Dict, List, Optional, Type, Union
6from jsonschema import Draft202012Validator
8from .handler import ValidationHandler
9from .handlers import loaded_handlers
10from .parse import load_json, to_uri
12JSONValidationErrors = Dict[str, List[str]]
13"""JSON validation errors mapping from JSON Pointers to of error message lists."""
16def plugin_from_uri(custom_uri: str) -> ValidationHandler:
17 """Parse a validation plugin pseudo-URI, return the plugin class and args string."""
18 try:
19 if not custom_uri.startswith("v#"):
20 raise ValueError
21 ep, args = custom_uri[2:].split("://")
22 if ep == "":
23 raise ValueError
24 except ValueError:
25 raise ValueError(f"Invalid custom validator plugin pseudo-URI: '{custom_uri}'")
27 try:
28 h: Type[ValidationHandler] = loaded_handlers[ep]
29 return h(args)
30 except KeyError:
31 raise ValueError(f"Validator entry-point not found: '{ep}'")
34def validate_custom(dat, plugin_str: str) -> JSONValidationErrors:
35 """Perform validation based on a validation handler string."""
36 h = plugin_from_uri(plugin_str)
37 if h._for_json:
38 return h.validate_json(dat, h.args)
39 else:
40 return h.validate_raw(dat, h.args)
43def validate_jsonschema(dat, schema: Union[bool, Dict]) -> JSONValidationErrors:
44 """Perform validation of a dict based on a JSON Schema."""
45 v = Draft202012Validator(schema=schema) # type: ignore
46 errs: Dict[str, List[str]] = {}
47 for verr in sorted(v.iter_errors(dat), key=lambda e: e.path): # type: ignore
48 key = "/" + "/".join(map(str, verr.path)) # JSON Pointer into document
49 if key not in errs:
50 errs[key] = []
51 errs[key].append(verr.message)
52 return errs
55def resolve_validator(
56 schema_or_ref: Union[bool, str, Dict],
57 *,
58 local_basedir: Optional[Path] = None,
59 relative_prefix: str = "",
60) -> Union[bool, Dict, ValidationHandler]:
61 """Resolve passed object into a schema or validator.
63 If passed object is already a schema, will return it.
64 If passed object is a string, will load the referenced schema
65 or instantiate the custom validator (a string starting with `v#`).
66 """
67 if isinstance(schema_or_ref, bool) or isinstance(schema_or_ref, dict):
68 # embedded schema
69 return schema_or_ref
71 if not schema_or_ref.startswith("v#"):
72 # load schema from URI
73 uri = to_uri(schema_or_ref, local_basedir, relative_prefix)
74 return load_json(uri, local_basedir=local_basedir)
76 # custom validation, not json schema
77 return plugin_from_uri(schema_or_ref)
80def validate_metadata(
81 dat,
82 schema: Union[bool, str, Dict, ValidationHandler],
83 *,
84 local_basedir: Optional[Path] = None,
85 relative_prefix: str = "",
86) -> JSONValidationErrors:
87 """Validate object (dict or byte stream) using JSON Schema or custom validator.
89 The validator must be either a JSON Schema dict, or a string
90 pointing to a JSON Schema, or a custom validator handler string.
92 Returns a dict mapping from JSON Pointers to a list of errors in that location.
93 If the dict is empty, no validation errors were detected.
94 """
95 if isinstance(schema, str):
96 val = resolve_validator(
97 schema, local_basedir=local_basedir, relative_prefix=relative_prefix
98 )
99 else:
100 val = schema
102 if isinstance(val, ValidationHandler):
103 return val.validate(dat)
104 else:
105 return validate_jsonschema(dat, val)