Coverage for src/dirschema/validate.py: 85%
237 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
1"""Validation API functionality for DirSchema."""
2from __future__ import annotations
4import copy
5import io
6import json
7import re
8from pathlib import Path
9from typing import Any, Dict, List, Optional, Tuple, Union
11from pydantic import BaseModel
12from ruamel.yaml import YAML
14from .adapters import IDirectory, get_adapter_for
15from .core import DSRule, MetaConvention, PathSlice, Rule, TypeEnum
16from .json.parse import load_json, to_uri
17from .json.validate import (
18 JSONValidationErrors,
19 ValidationHandler,
20 resolve_validator,
21 validate_metadata,
22)
23from .log import logger
25yaml = YAML(typ="safe")
26yaml.default_flow_style = False
29def loc_to_jsonpointer(lst) -> str:
30 """Convert a list of string keys and int indices to a JSON Pointer string."""
31 return "/" + "/".join(map(str, lst))
34def json_dict(model, **kwargs):
35 """Given a Pydantic model, convert it to a raw JSON compatible dict.
37 This uses a round-trip via JSON-serialization and deserialization to get rid
38 of non-JSON entities (the `BaseModel.dict()` method yields possibly non-JSON dicts).
39 """
40 return json.loads(model.json(**kwargs))
43class DSValidationError(BaseModel):
44 """A single Dirschema validation error."""
46 path: str
47 """File path that was evaluated (possibly a result of applied rewrites)."""
49 err: Union[str, JSONValidationErrors]
50 """Error object (error message or a dict with refined validation errors)."""
53DSValidationErrors = Dict[Tuple[Union[str, int], ...], DSValidationError]
54"""Dict mapping from error locations in schema to errors.
56The keys of this dict can be used to access the corresponding sub-rule
57if the schema is loaded as a JSON dict.
58"""
60DSValidationResult = Dict[str, DSValidationErrors]
61"""The validation result is a mapping from file/directory paths to
62corresponding validation errors for all entities where validation failed.
63"""
66class DSEvalCtx(BaseModel):
67 """DirSchema evaluation context, used like a Reader Monad.
69 Contains information that is required to evaluate a rule for a path.
70 """
72 class Config: # noqa: D106
73 arbitrary_types_allowed = True
75 dirAdapter: IDirectory
76 """Adapter to access metadata files and get paths from."""
78 metaConvention: MetaConvention = MetaConvention()
79 """Convention to use for validMeta."""
81 # ----
83 errors: DSValidationErrors = {}
84 failed: bool = False
86 filePath: str = ""
87 """Path of currently checked file (possibly rewritten)."""
89 location: List[Union[str, int]] = []
90 """Relative location of current rule."""
92 # passed down from parent rule / overridden with current rule:
94 matchStart: int = 0
95 matchStop: int = 0
96 matchPat: Optional[re.Pattern] = None
98 @classmethod
99 def fresh(cls, rule: DSRule, **kwargs):
100 """Initialize a fresh evaluation context."""
101 ret = DSEvalCtx(**kwargs) # initialize most fields from passed kwargs
102 return ret.descend(rule) # initialize match* fields from rule
104 def descend(
105 self,
106 rule: DSRule,
107 filepath: Optional[str] = None,
108 reachedVia: Optional[Any] = None,
109 ) -> DSEvalCtx:
110 """Return a new context updated with fields from the given rule.
112 Input must be the next sub-rule, the possibly rewritten entity path
113 and the key in the parent rule that is used to access the sub-rule.
115 This will not preserve the parent errors (use `add_errors` to merge).
116 """
117 ret = self.copy()
118 ret.errors = {}
119 ret.location = list(self.location)
121 if isinstance(rule.__root__, Rule):
122 # override match configuration and pattern, if specified in child rule
123 rl: Rule = rule.__root__
124 if rl.matchStart:
125 ret.matchStart = rl.matchStart
126 if rl.matchStop:
127 ret.matchStart = rl.matchStop
128 if rl.match:
129 ret.matchPat = rl.match
131 if filepath is not None:
132 ret.filePath = filepath
134 if reachedVia is not None:
135 ret.location.append(reachedVia)
137 return ret
139 def add_error(
140 self,
141 err: Any,
142 child: Optional[Union[str, int]] = None,
143 path: Optional[str] = None,
144 ):
145 """Add an error object at current location.
147 Will extend current location with `child`, if given,
148 will use passed `path`, if given.
149 """
150 loc = self.location if child is None else self.location + [child]
151 fp = path or self.filePath
152 self.errors[tuple(loc)] = DSValidationError(path=fp, err=err)
154 def add_errors(self, *err_dicts):
155 """Merge all passed error dicts into the errors of this context."""
156 for err_dict in err_dicts:
157 self.errors.update(err_dict)
160class DSValidator:
161 """Validator class that performs dirschema validation for a given dirschema."""
163 def __init__(
164 self,
165 schema: Union[bool, Rule, DSRule, str, Path],
166 meta_conv: Optional[MetaConvention] = None,
167 local_basedir: Optional[Path] = None,
168 relative_prefix: str = "",
169 ) -> None:
170 """Construct validator instance from given schema or schema location.
172 Accepts DSRule, raw bool or Rule, or a str/Path that is interpreted as location.
173 """
174 self.meta_conv = meta_conv or MetaConvention()
175 self.local_basedir = local_basedir
176 self.relative_prefix = relative_prefix
178 # if the passed relative prefix is a custom plugin, we cannot use this
179 # for $ref resolving, so we will ignore it in the Json/Yaml loader
180 is_plugin_prefix = relative_prefix.find("v#") < relative_prefix.find("://") == 0
182 # take care of the passed schema based on its type
183 if isinstance(schema, bool) or isinstance(schema, Rule):
184 self.schema = DSRule(__root__=schema)
185 elif isinstance(schema, DSRule):
186 self.schema = schema
187 elif isinstance(schema, str) or isinstance(schema, Path):
188 uri = to_uri(str(schema), self.local_basedir, self.relative_prefix)
189 dat = load_json(
190 uri,
191 local_basedir=self.local_basedir,
192 relative_prefix=self.relative_prefix if not is_plugin_prefix else "",
193 )
194 # use deepcopy to get rid of jsonref (see jsonref issue #9)
195 # otherwise we will get problems with pydantic serialization later
196 self.schema = DSRule.parse_obj(copy.deepcopy(dat))
197 else:
198 raise ValueError(f"Do not know how to process provided schema: {schema}")
200 logger.debug(
201 "Initialized dirschema validator\n"
202 f"schema: {self.schema}\n"
203 f"meta_conv: {self.meta_conv}\n"
204 f"local_basedir: {self.local_basedir}\n"
205 )
207 @classmethod
208 def errors_to_json(cls, errs: DSValidationResult) -> Dict[str, Any]:
209 """Convert the validation result to a JSON-compatible dict.
211 Resulting structure is (file path -> schema location -> error message or dict).
212 """
213 return {
214 file_path: {
215 loc_to_jsonpointer(err_loc): json_dict(err_obj, exclude_defaults=True)
216 for err_loc, err_obj in file_errors.items()
217 }
218 for file_path, file_errors in errs.items()
219 }
221 @classmethod
222 def format_errors(cls, errs: DSValidationResult, stream=None) -> Optional[str]:
223 """Report errors as YAML output.
225 If a stream is provided, prints it out. Otherwise, returns it as string.
226 """
227 of = stream or io.StringIO()
228 yaml.dump(cls.errors_to_json(errs), of)
229 if not stream:
230 return of.getvalue()
231 return None
233 def validate(
234 self, root_path: Union[Path, IDirectory], **kwargs
235 ) -> DSValidationResult:
236 """Validate a directory, return all validation errors (unsatisfied rules).
238 If `root_path` is an instance of `IDirectory`, it will be used directly.
240 If `root_path` is a `Path`, this function will try to pick the correct
241 interface for interpreting "files" and "directories", depending on
242 whether the provided file is a directory or a supported kind of archive
243 file with internal structure.
245 Depending on the used metadata convention, the companion metadata files
246 matching the convention will be filtered out from the set of validated
247 paths.
249 Returns
250 Error dict that is empty in case of success, or otherwise contains
251 for each path with validation errors another dict with the errors.
252 """
253 logger.debug(f"validate '{root_path}' ...")
254 if isinstance(root_path, Path):
255 root_path = get_adapter_for(root_path)
256 paths = [p for p in root_path.get_paths() if not self.meta_conv.is_meta(p)]
257 errors: Dict[str, Any] = {}
258 # run validation for each filepath, collect errors separately
259 for p in paths:
260 ctx = DSEvalCtx.fresh(
261 self.schema,
262 dirAdapter=root_path,
263 metaConvention=self.meta_conv,
264 filePath=p,
265 **kwargs,
266 )
267 logger.debug(f"validate_path '{p}' ...")
268 success = self.validate_path(p, self.schema, ctx)
269 logger.debug(f"validate_path '{p}' -> {success}")
270 if not success:
271 errors[p] = ctx.errors or {
272 (): DSValidationError(
273 path=p, err="Validation failed (no error log available)."
274 )
275 }
276 return errors
278 def validate_path(self, path: str, rule: DSRule, curCtx: DSEvalCtx) -> bool:
279 """Apply rule to path of file/directory under given evaluation context.
281 Will collect errors in the context object.
283 Note that not all errors might be reported, as the sub-rules are
284 evaluated in different stages and each stage aborts evaluation on
285 failure (i.e. match/rewrite, primitive rules, complex logic rules,
286 `next` sub-rule)
288 Returns True iff validation of this rule was successful.
289 """
290 logger.debug(f"validate_path '{path}', at rule location: {curCtx.location}")
292 # special case: trivial bool rule
293 if isinstance(rule.__root__, bool):
294 logger.debug(curCtx.location, "trivial rule")
295 if not rule.__root__:
296 curCtx.failed = True
297 curCtx.add_error("Reached unsatisfiable 'false' rule")
298 return not curCtx.failed
300 rl = rule.__root__ # unpack rule
301 # assert isinstance(rl, Rule)
303 # 1. match / rewrite
304 # if rewrite is set, don't need to do separate match,just try rewriting
305 # match/rewrite does not produce an error on its own, but can fail
306 # because "match failure" is usually not "validation failure"
307 psl = PathSlice.into(path, curCtx.matchStart, curCtx.matchStop)
308 nextPath: str = path # to be used for implication later on
309 if rl.match or rl.rewrite:
310 # important! using the match pattern from the context (could be inherited)
311 rewritten = psl.rewrite(curCtx.matchPat, rl.rewrite)
312 if rewritten is not None:
313 nextPath = rewritten.unslice()
314 else: # failed match or rewrite
315 op = "rewrite" if rl.rewrite else "match"
316 pat = curCtx.matchPat or psl._def_pat
317 matchPat = f"match '{pat.pattern}'"
318 rwPat = f" and rewrite to '{str(rl.rewrite)}'" if rl.rewrite else ""
320 if rl.description: # add custom error without expanding groups
321 curCtx.add_error(rl.description, op)
322 else:
323 curCtx.add_error(f"Failed to {matchPat}{rwPat}", op)
324 curCtx.failed = True
325 return False
327 # 2. proceed with the other primitive constraints
329 def add_error(*args):
330 """If desc is set, add desc error once and else add passed error."""
331 if rl.description is None:
332 curCtx.add_error(*args)
333 elif rl.description != "" and not curCtx.failed:
334 # add error with expanded groups for better error messages
335 curCtx.add_error(psl.match(curCtx.matchPat).expand(rl.description))
336 curCtx.failed = True
338 # take care of type constraint
339 is_file = curCtx.dirAdapter.is_file(path)
340 is_dir = curCtx.dirAdapter.is_dir(path)
341 if rl.type is not None and not rl.type.is_satisfied(is_file, is_dir):
342 msg = f"Entity does not have expected type: '{rl.type.value}'"
343 if rl.type == TypeEnum.ANY:
344 msg = "Entity must exist (type: true)"
345 elif rl.type == TypeEnum.MISSING:
346 msg = "Entity must not exist (type: false)"
347 add_error(msg, "type", None)
349 # take care of metadata JSON Schema validation constraint
350 for key in ("valid", "validMeta"):
351 if rl.__dict__[key] is None: # attribute not set
352 continue
354 if not is_file and not is_dir:
355 add_error(f"Path '{path}' does not exist", key, None)
356 continue
358 # use metadata convention for validMeta
359 metapath = path
360 if key == "validMeta":
361 metapath = curCtx.metaConvention.meta_for(path, is_dir=is_dir)
363 # load metadata file
364 dat = curCtx.dirAdapter.open_file(metapath)
365 if dat is None:
366 add_error(f"File '{metapath}' could not be loaded", key, metapath)
367 continue
369 # prepare correct validation method (JSON Schema or custom plugin)
370 schema_or_plugin = resolve_validator(
371 rl.__dict__[key],
372 local_basedir=self.local_basedir,
373 relative_prefix=self.relative_prefix,
374 )
376 # check whether loaded metadata file should be parsed as JSON
377 parse_json = (
378 not isinstance(schema_or_plugin, ValidationHandler)
379 or not schema_or_plugin._for_json
380 )
381 if parse_json:
382 # not a handler plugin for raw data -> load as JSON
383 dat = curCtx.dirAdapter.decode_json(dat, metapath)
384 if dat is None:
385 add_error(f"File '{metapath}' could not be parsed", key, metapath)
386 continue
388 valErrs = validate_metadata(dat, schema_or_plugin)
389 if valErrs:
390 add_error(valErrs, key, metapath)
392 if curCtx.failed:
393 return False # stop validation if primitive checks failed
395 # 3. check the complex constraints
397 # if-then-else
398 if rl.if_ is not None:
399 ifCtx = curCtx.descend(rl.if_, path, "if")
400 if self.validate_path(path, rl.if_, ifCtx):
401 if rl.then is not None:
402 thenCtx = curCtx.descend(rl.then, path, "then")
403 if not self.validate_path(path, rl.then, thenCtx):
404 curCtx.failed = True
405 # add_error("'if' rule satisfied, but 'then' rule violated", "then") # noqa: E501
406 if rl.details:
407 curCtx.add_errors(thenCtx.errors)
408 else:
409 if rl.else_ is not None:
410 elseCtx = curCtx.descend(rl.else_, path, "else")
411 if not self.validate_path(path, rl.else_, elseCtx):
412 curCtx.failed = True
413 # add_error("'if' rule violated and also 'else' rule violated", "else") # noqa: E501
415 if rl.details:
416 curCtx.add_errors(elseCtx.errors)
418 # logical operators
419 for op in ("allOf", "anyOf", "oneOf"):
420 val = rl.__dict__[op]
421 opCtx = curCtx.descend(rule, None, op)
423 num_rules = len(val)
424 if num_rules == 0:
425 continue # empty list of rules -> nothing to do
427 num_fails = 0
428 suberrs: List[DSValidationErrors] = []
429 for idx, r in enumerate(val):
430 subCtx = opCtx.descend(r, None, idx)
431 success = self.validate_path(path, r, subCtx)
432 if success and op == "anyOf":
433 suberrs = [] # don't care about the errors on success
434 break # we have a satisfied rule -> enough
435 elif not success:
436 num_fails += 1
437 if subCtx.errors:
438 suberrs.append(subCtx.errors)
440 num_sat = num_rules - num_fails
441 err_msg = ""
442 if op == "allOf" and num_fails > 0:
443 err_msg = "All"
444 elif op == "oneOf" and num_fails != num_rules - 1:
445 err_msg = "Exactly 1"
446 elif op == "anyOf" and num_fails == num_rules:
447 err_msg = "At least 1"
448 if err_msg:
449 err_msg += f" of {num_rules} sub-rules must be satisfied "
450 err_msg += f"(satisfied: {num_sat})"
451 add_error(err_msg, op, None)
452 if rl.details:
453 curCtx.add_errors(*suberrs)
455 if rl.not_ is not None:
456 notCtx = curCtx.descend(rl.not_, path, "not")
457 if self.validate_path(path, rl.not_, notCtx):
458 add_error(
459 "Negated sub-rule satisfied, but should have failed", "not", None
460 )
462 if curCtx.failed:
463 return False # stop validation here if logical expressions failed
465 # 4. perform "next" rule, on possibly rewritten path
466 if rl.next is not None:
467 nextCtx = curCtx.descend(rl.next, nextPath, "next")
468 if not self.validate_path(nextPath, rl.next, nextCtx):
469 if rl.details:
470 curCtx.add_errors(nextCtx.errors)
471 return False
473 # assert curCtx.failed == False
474 return True