Coverage for src/dirschema/validate.py: 85%

237 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-07 09:34 +0000

1"""Validation API functionality for DirSchema.""" 

2from __future__ import annotations 

3 

4import copy 

5import io 

6import json 

7import re 

8from pathlib import Path 

9from typing import Any, Dict, List, Optional, Tuple, Union 

10 

11from pydantic import BaseModel 

12from ruamel.yaml import YAML 

13 

14from .adapters import IDirectory, get_adapter_for 

15from .core import DSRule, MetaConvention, PathSlice, Rule, TypeEnum 

16from .json.parse import load_json, to_uri 

17from .json.validate import ( 

18 JSONValidationErrors, 

19 ValidationHandler, 

20 resolve_validator, 

21 validate_metadata, 

22) 

23from .log import logger 

24 

25yaml = YAML(typ="safe") 

26yaml.default_flow_style = False 

27 

28 

29def loc_to_jsonpointer(lst) -> str: 

30 """Convert a list of string keys and int indices to a JSON Pointer string.""" 

31 return "/" + "/".join(map(str, lst)) 

32 

33 

34def json_dict(model, **kwargs): 

35 """Given a Pydantic model, convert it to a raw JSON compatible dict. 

36 

37 This uses a round-trip via JSON-serialization and deserialization to get rid 

38 of non-JSON entities (the `BaseModel.dict()` method yields possibly non-JSON dicts). 

39 """ 

40 return json.loads(model.json(**kwargs)) 

41 

42 

43class DSValidationError(BaseModel): 

44 """A single Dirschema validation error.""" 

45 

46 path: str 

47 """File path that was evaluated (possibly a result of applied rewrites).""" 

48 

49 err: Union[str, JSONValidationErrors] 

50 """Error object (error message or a dict with refined validation errors).""" 

51 

52 

53DSValidationErrors = Dict[Tuple[Union[str, int], ...], DSValidationError] 

54"""Dict mapping from error locations in schema to errors. 

55 

56The keys of this dict can be used to access the corresponding sub-rule 

57if the schema is loaded as a JSON dict. 

58""" 

59 

60DSValidationResult = Dict[str, DSValidationErrors] 

61"""The validation result is a mapping from file/directory paths to 

62corresponding validation errors for all entities where validation failed. 

63""" 

64 

65 

66class DSEvalCtx(BaseModel): 

67 """DirSchema evaluation context, used like a Reader Monad. 

68 

69 Contains information that is required to evaluate a rule for a path. 

70 """ 

71 

72 class Config: # noqa: D106 

73 arbitrary_types_allowed = True 

74 

75 dirAdapter: IDirectory 

76 """Adapter to access metadata files and get paths from.""" 

77 

78 metaConvention: MetaConvention = MetaConvention() 

79 """Convention to use for validMeta.""" 

80 

81 # ---- 

82 

83 errors: DSValidationErrors = {} 

84 failed: bool = False 

85 

86 filePath: str = "" 

87 """Path of currently checked file (possibly rewritten).""" 

88 

89 location: List[Union[str, int]] = [] 

90 """Relative location of current rule.""" 

91 

92 # passed down from parent rule / overridden with current rule: 

93 

94 matchStart: int = 0 

95 matchStop: int = 0 

96 matchPat: Optional[re.Pattern] = None 

97 

98 @classmethod 

99 def fresh(cls, rule: DSRule, **kwargs): 

100 """Initialize a fresh evaluation context.""" 

101 ret = DSEvalCtx(**kwargs) # initialize most fields from passed kwargs 

102 return ret.descend(rule) # initialize match* fields from rule 

103 

104 def descend( 

105 self, 

106 rule: DSRule, 

107 filepath: Optional[str] = None, 

108 reachedVia: Optional[Any] = None, 

109 ) -> DSEvalCtx: 

110 """Return a new context updated with fields from the given rule. 

111 

112 Input must be the next sub-rule, the possibly rewritten entity path 

113 and the key in the parent rule that is used to access the sub-rule. 

114 

115 This will not preserve the parent errors (use `add_errors` to merge). 

116 """ 

117 ret = self.copy() 

118 ret.errors = {} 

119 ret.location = list(self.location) 

120 

121 if isinstance(rule.__root__, Rule): 

122 # override match configuration and pattern, if specified in child rule 

123 rl: Rule = rule.__root__ 

124 if rl.matchStart: 

125 ret.matchStart = rl.matchStart 

126 if rl.matchStop: 

127 ret.matchStart = rl.matchStop 

128 if rl.match: 

129 ret.matchPat = rl.match 

130 

131 if filepath is not None: 

132 ret.filePath = filepath 

133 

134 if reachedVia is not None: 

135 ret.location.append(reachedVia) 

136 

137 return ret 

138 

139 def add_error( 

140 self, 

141 err: Any, 

142 child: Optional[Union[str, int]] = None, 

143 path: Optional[str] = None, 

144 ): 

145 """Add an error object at current location. 

146 

147 Will extend current location with `child`, if given, 

148 will use passed `path`, if given. 

149 """ 

150 loc = self.location if child is None else self.location + [child] 

151 fp = path or self.filePath 

152 self.errors[tuple(loc)] = DSValidationError(path=fp, err=err) 

153 

154 def add_errors(self, *err_dicts): 

155 """Merge all passed error dicts into the errors of this context.""" 

156 for err_dict in err_dicts: 

157 self.errors.update(err_dict) 

158 

159 

160class DSValidator: 

161 """Validator class that performs dirschema validation for a given dirschema.""" 

162 

163 def __init__( 

164 self, 

165 schema: Union[bool, Rule, DSRule, str, Path], 

166 meta_conv: Optional[MetaConvention] = None, 

167 local_basedir: Optional[Path] = None, 

168 relative_prefix: str = "", 

169 ) -> None: 

170 """Construct validator instance from given schema or schema location. 

171 

172 Accepts DSRule, raw bool or Rule, or a str/Path that is interpreted as location. 

173 """ 

174 self.meta_conv = meta_conv or MetaConvention() 

175 self.local_basedir = local_basedir 

176 self.relative_prefix = relative_prefix 

177 

178 # if the passed relative prefix is a custom plugin, we cannot use this 

179 # for $ref resolving, so we will ignore it in the Json/Yaml loader 

180 is_plugin_prefix = relative_prefix.find("v#") < relative_prefix.find("://") == 0 

181 

182 # take care of the passed schema based on its type 

183 if isinstance(schema, bool) or isinstance(schema, Rule): 

184 self.schema = DSRule(__root__=schema) 

185 elif isinstance(schema, DSRule): 

186 self.schema = schema 

187 elif isinstance(schema, str) or isinstance(schema, Path): 

188 uri = to_uri(str(schema), self.local_basedir, self.relative_prefix) 

189 dat = load_json( 

190 uri, 

191 local_basedir=self.local_basedir, 

192 relative_prefix=self.relative_prefix if not is_plugin_prefix else "", 

193 ) 

194 # use deepcopy to get rid of jsonref (see jsonref issue #9) 

195 # otherwise we will get problems with pydantic serialization later 

196 self.schema = DSRule.parse_obj(copy.deepcopy(dat)) 

197 else: 

198 raise ValueError(f"Do not know how to process provided schema: {schema}") 

199 

200 logger.debug( 

201 "Initialized dirschema validator\n" 

202 f"schema: {self.schema}\n" 

203 f"meta_conv: {self.meta_conv}\n" 

204 f"local_basedir: {self.local_basedir}\n" 

205 ) 

206 

207 @classmethod 

208 def errors_to_json(cls, errs: DSValidationResult) -> Dict[str, Any]: 

209 """Convert the validation result to a JSON-compatible dict. 

210 

211 Resulting structure is (file path -> schema location -> error message or dict). 

212 """ 

213 return { 

214 file_path: { 

215 loc_to_jsonpointer(err_loc): json_dict(err_obj, exclude_defaults=True) 

216 for err_loc, err_obj in file_errors.items() 

217 } 

218 for file_path, file_errors in errs.items() 

219 } 

220 

221 @classmethod 

222 def format_errors(cls, errs: DSValidationResult, stream=None) -> Optional[str]: 

223 """Report errors as YAML output. 

224 

225 If a stream is provided, prints it out. Otherwise, returns it as string. 

226 """ 

227 of = stream or io.StringIO() 

228 yaml.dump(cls.errors_to_json(errs), of) 

229 if not stream: 

230 return of.getvalue() 

231 return None 

232 

233 def validate( 

234 self, root_path: Union[Path, IDirectory], **kwargs 

235 ) -> DSValidationResult: 

236 """Validate a directory, return all validation errors (unsatisfied rules). 

237 

238 If `root_path` is an instance of `IDirectory`, it will be used directly. 

239 

240 If `root_path` is a `Path`, this function will try to pick the correct 

241 interface for interpreting "files" and "directories", depending on 

242 whether the provided file is a directory or a supported kind of archive 

243 file with internal structure. 

244 

245 Depending on the used metadata convention, the companion metadata files 

246 matching the convention will be filtered out from the set of validated 

247 paths. 

248 

249 Returns 

250 Error dict that is empty in case of success, or otherwise contains 

251 for each path with validation errors another dict with the errors. 

252 """ 

253 logger.debug(f"validate '{root_path}' ...") 

254 if isinstance(root_path, Path): 

255 root_path = get_adapter_for(root_path) 

256 paths = [p for p in root_path.get_paths() if not self.meta_conv.is_meta(p)] 

257 errors: Dict[str, Any] = {} 

258 # run validation for each filepath, collect errors separately 

259 for p in paths: 

260 ctx = DSEvalCtx.fresh( 

261 self.schema, 

262 dirAdapter=root_path, 

263 metaConvention=self.meta_conv, 

264 filePath=p, 

265 **kwargs, 

266 ) 

267 logger.debug(f"validate_path '{p}' ...") 

268 success = self.validate_path(p, self.schema, ctx) 

269 logger.debug(f"validate_path '{p}' -> {success}") 

270 if not success: 

271 errors[p] = ctx.errors or { 

272 (): DSValidationError( 

273 path=p, err="Validation failed (no error log available)." 

274 ) 

275 } 

276 return errors 

277 

278 def validate_path(self, path: str, rule: DSRule, curCtx: DSEvalCtx) -> bool: 

279 """Apply rule to path of file/directory under given evaluation context. 

280 

281 Will collect errors in the context object. 

282 

283 Note that not all errors might be reported, as the sub-rules are 

284 evaluated in different stages and each stage aborts evaluation on 

285 failure (i.e. match/rewrite, primitive rules, complex logic rules, 

286 `next` sub-rule) 

287 

288 Returns True iff validation of this rule was successful. 

289 """ 

290 logger.debug(f"validate_path '{path}', at rule location: {curCtx.location}") 

291 

292 # special case: trivial bool rule 

293 if isinstance(rule.__root__, bool): 

294 logger.debug(curCtx.location, "trivial rule") 

295 if not rule.__root__: 

296 curCtx.failed = True 

297 curCtx.add_error("Reached unsatisfiable 'false' rule") 

298 return not curCtx.failed 

299 

300 rl = rule.__root__ # unpack rule 

301 # assert isinstance(rl, Rule) 

302 

303 # 1. match / rewrite 

304 # if rewrite is set, don't need to do separate match,just try rewriting 

305 # match/rewrite does not produce an error on its own, but can fail 

306 # because "match failure" is usually not "validation failure" 

307 psl = PathSlice.into(path, curCtx.matchStart, curCtx.matchStop) 

308 nextPath: str = path # to be used for implication later on 

309 if rl.match or rl.rewrite: 

310 # important! using the match pattern from the context (could be inherited) 

311 rewritten = psl.rewrite(curCtx.matchPat, rl.rewrite) 

312 if rewritten is not None: 

313 nextPath = rewritten.unslice() 

314 else: # failed match or rewrite 

315 op = "rewrite" if rl.rewrite else "match" 

316 pat = curCtx.matchPat or psl._def_pat 

317 matchPat = f"match '{pat.pattern}'" 

318 rwPat = f" and rewrite to '{str(rl.rewrite)}'" if rl.rewrite else "" 

319 

320 if rl.description: # add custom error without expanding groups 

321 curCtx.add_error(rl.description, op) 

322 else: 

323 curCtx.add_error(f"Failed to {matchPat}{rwPat}", op) 

324 curCtx.failed = True 

325 return False 

326 

327 # 2. proceed with the other primitive constraints 

328 

329 def add_error(*args): 

330 """If desc is set, add desc error once and else add passed error.""" 

331 if rl.description is None: 

332 curCtx.add_error(*args) 

333 elif rl.description != "" and not curCtx.failed: 

334 # add error with expanded groups for better error messages 

335 curCtx.add_error(psl.match(curCtx.matchPat).expand(rl.description)) 

336 curCtx.failed = True 

337 

338 # take care of type constraint 

339 is_file = curCtx.dirAdapter.is_file(path) 

340 is_dir = curCtx.dirAdapter.is_dir(path) 

341 if rl.type is not None and not rl.type.is_satisfied(is_file, is_dir): 

342 msg = f"Entity does not have expected type: '{rl.type.value}'" 

343 if rl.type == TypeEnum.ANY: 

344 msg = "Entity must exist (type: true)" 

345 elif rl.type == TypeEnum.MISSING: 

346 msg = "Entity must not exist (type: false)" 

347 add_error(msg, "type", None) 

348 

349 # take care of metadata JSON Schema validation constraint 

350 for key in ("valid", "validMeta"): 

351 if rl.__dict__[key] is None: # attribute not set 

352 continue 

353 

354 if not is_file and not is_dir: 

355 add_error(f"Path '{path}' does not exist", key, None) 

356 continue 

357 

358 # use metadata convention for validMeta 

359 metapath = path 

360 if key == "validMeta": 

361 metapath = curCtx.metaConvention.meta_for(path, is_dir=is_dir) 

362 

363 # load metadata file 

364 dat = curCtx.dirAdapter.open_file(metapath) 

365 if dat is None: 

366 add_error(f"File '{metapath}' could not be loaded", key, metapath) 

367 continue 

368 

369 # prepare correct validation method (JSON Schema or custom plugin) 

370 schema_or_plugin = resolve_validator( 

371 rl.__dict__[key], 

372 local_basedir=self.local_basedir, 

373 relative_prefix=self.relative_prefix, 

374 ) 

375 

376 # check whether loaded metadata file should be parsed as JSON 

377 parse_json = ( 

378 not isinstance(schema_or_plugin, ValidationHandler) 

379 or not schema_or_plugin._for_json 

380 ) 

381 if parse_json: 

382 # not a handler plugin for raw data -> load as JSON 

383 dat = curCtx.dirAdapter.decode_json(dat, metapath) 

384 if dat is None: 

385 add_error(f"File '{metapath}' could not be parsed", key, metapath) 

386 continue 

387 

388 valErrs = validate_metadata(dat, schema_or_plugin) 

389 if valErrs: 

390 add_error(valErrs, key, metapath) 

391 

392 if curCtx.failed: 

393 return False # stop validation if primitive checks failed 

394 

395 # 3. check the complex constraints 

396 

397 # if-then-else 

398 if rl.if_ is not None: 

399 ifCtx = curCtx.descend(rl.if_, path, "if") 

400 if self.validate_path(path, rl.if_, ifCtx): 

401 if rl.then is not None: 

402 thenCtx = curCtx.descend(rl.then, path, "then") 

403 if not self.validate_path(path, rl.then, thenCtx): 

404 curCtx.failed = True 

405 # add_error("'if' rule satisfied, but 'then' rule violated", "then") # noqa: E501 

406 if rl.details: 

407 curCtx.add_errors(thenCtx.errors) 

408 else: 

409 if rl.else_ is not None: 

410 elseCtx = curCtx.descend(rl.else_, path, "else") 

411 if not self.validate_path(path, rl.else_, elseCtx): 

412 curCtx.failed = True 

413 # add_error("'if' rule violated and also 'else' rule violated", "else") # noqa: E501 

414 

415 if rl.details: 

416 curCtx.add_errors(elseCtx.errors) 

417 

418 # logical operators 

419 for op in ("allOf", "anyOf", "oneOf"): 

420 val = rl.__dict__[op] 

421 opCtx = curCtx.descend(rule, None, op) 

422 

423 num_rules = len(val) 

424 if num_rules == 0: 

425 continue # empty list of rules -> nothing to do 

426 

427 num_fails = 0 

428 suberrs: List[DSValidationErrors] = [] 

429 for idx, r in enumerate(val): 

430 subCtx = opCtx.descend(r, None, idx) 

431 success = self.validate_path(path, r, subCtx) 

432 if success and op == "anyOf": 

433 suberrs = [] # don't care about the errors on success 

434 break # we have a satisfied rule -> enough 

435 elif not success: 

436 num_fails += 1 

437 if subCtx.errors: 

438 suberrs.append(subCtx.errors) 

439 

440 num_sat = num_rules - num_fails 

441 err_msg = "" 

442 if op == "allOf" and num_fails > 0: 

443 err_msg = "All" 

444 elif op == "oneOf" and num_fails != num_rules - 1: 

445 err_msg = "Exactly 1" 

446 elif op == "anyOf" and num_fails == num_rules: 

447 err_msg = "At least 1" 

448 if err_msg: 

449 err_msg += f" of {num_rules} sub-rules must be satisfied " 

450 err_msg += f"(satisfied: {num_sat})" 

451 add_error(err_msg, op, None) 

452 if rl.details: 

453 curCtx.add_errors(*suberrs) 

454 

455 if rl.not_ is not None: 

456 notCtx = curCtx.descend(rl.not_, path, "not") 

457 if self.validate_path(path, rl.not_, notCtx): 

458 add_error( 

459 "Negated sub-rule satisfied, but should have failed", "not", None 

460 ) 

461 

462 if curCtx.failed: 

463 return False # stop validation here if logical expressions failed 

464 

465 # 4. perform "next" rule, on possibly rewritten path 

466 if rl.next is not None: 

467 nextCtx = curCtx.descend(rl.next, nextPath, "next") 

468 if not self.validate_path(nextPath, rl.next, nextCtx): 

469 if rl.details: 

470 curCtx.add_errors(nextCtx.errors) 

471 return False 

472 

473 # assert curCtx.failed == False 

474 return True