Coverage for src/dirschema/json/validate.py: 98%
51 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
1"""Helper functions to perform validation of JSON-compatible metadata files."""
3from pathlib import Path
4from typing import Dict, List, Optional, Type, Union
6from jsonschema import Draft202012Validator
8from .handler import ValidationHandler
9from .handlers import loaded_handlers
10from .parse import load_json, to_uri
12JSONValidationErrors = Dict[str, List[str]]
13"""JSON validation errors mapping from JSON Pointers to of error message lists."""
16def plugin_from_uri(custom_uri: str) -> ValidationHandler:
17 """Parse a validation plugin pseudo-URI, return the plugin class and args string."""
18 try:
19 if not custom_uri.startswith("v#"):
20 raise ValueError
21 ep, args = custom_uri[2:].split("://")
22 if ep == "":
23 raise ValueError
24 except ValueError:
25 msg = f"Invalid custom validator plugin pseudo-URI: '{custom_uri}'"
26 raise ValueError(msg) from None
28 try:
29 h: Type[ValidationHandler] = loaded_handlers[ep]
30 return h(args)
31 except KeyError:
32 raise ValueError(f"Validator entry-point not found: '{ep}'") from None
35def validate_custom(dat, plugin_str: str) -> JSONValidationErrors:
36 """Perform validation based on a validation handler string."""
37 h = plugin_from_uri(plugin_str)
38 if h._for_json:
39 return h.validate_json(dat, h.args)
40 else:
41 return h.validate_raw(dat, h.args)
44def validate_jsonschema(dat, schema: Union[bool, Dict]) -> JSONValidationErrors:
45 """Perform validation of a dict based on a JSON Schema."""
46 v = Draft202012Validator(schema=schema) # type: ignore
47 errs: Dict[str, List[str]] = {}
48 for verr in sorted(v.iter_errors(dat), key=lambda e: e.path): # type: ignore
49 key = "/" + "/".join(map(str, verr.path)) # JSON Pointer into document
50 if key not in errs:
51 errs[key] = []
52 errs[key].append(verr.message)
53 return errs
56def resolve_validator(
57 schema_or_ref: Union[bool, str, Dict],
58 *,
59 local_basedir: Optional[Path] = None,
60 relative_prefix: str = "",
61) -> Union[bool, Dict, ValidationHandler]:
62 """Resolve passed object into a schema or validator.
64 If passed object is already a schema, will return it.
65 If passed object is a string, will load the referenced schema
66 or instantiate the custom validator (a string starting with `v#`).
67 """
68 if isinstance(schema_or_ref, bool) or isinstance(schema_or_ref, dict):
69 # embedded schema
70 return schema_or_ref
72 if not schema_or_ref.startswith("v#"):
73 # load schema from URI
74 uri = to_uri(schema_or_ref, local_basedir, relative_prefix)
75 return load_json(uri, local_basedir=local_basedir)
77 # custom validation, not json schema
78 return plugin_from_uri(schema_or_ref)
81def validate_metadata(
82 dat,
83 schema: Union[bool, str, Dict, ValidationHandler],
84 *,
85 local_basedir: Optional[Path] = None,
86 relative_prefix: str = "",
87) -> JSONValidationErrors:
88 """Validate object (dict or byte stream) using JSON Schema or custom validator.
90 The validator must be either a JSON Schema dict, or a string
91 pointing to a JSON Schema, or a custom validator handler string.
93 Returns a dict mapping from JSON Pointers to a list of errors in that location.
94 If the dict is empty, no validation errors were detected.
95 """
96 if isinstance(schema, str):
97 val = resolve_validator(
98 schema, local_basedir=local_basedir, relative_prefix=relative_prefix
99 )
100 else:
101 val = schema
103 if isinstance(val, ValidationHandler):
104 return val.validate(dat)
105 else:
106 return validate_jsonschema(dat, val)