Coverage for src/dirschema/core.py: 97%

148 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-07 09:34 +0000

1"""Core types of dirschema.""" 

2 

3from __future__ import annotations 

4 

5import io 

6import json 

7import re 

8from enum import Enum 

9from pathlib import Path 

10from typing import List, Optional, Pattern, Tuple, Union 

11 

12from jsonschema import Draft202012Validator 

13from pydantic import BaseModel, Extra, Field, root_validator 

14from ruamel.yaml import YAML 

15from typing_extensions import Final 

16 

17yaml = YAML(typ="safe") 

18 

19 

20class MetaConvention(BaseModel): 

21 """Filename convention for metadata files that are associated with other entities. 

22 

23 It defines where to look for metadata for files that are not themselves known 

24 as json, or metadata concerning directories. 

25 

26 At the same time, these files are ignored by themselves and act as "sidecar" files. 

27 """ 

28 

29 pathPrefix: str = "" 

30 pathSuffix: str = "" 

31 filePrefix: str = "" 

32 fileSuffix: str = "_meta.json" 

33 

34 @root_validator 

35 def check_valid(cls, values): 

36 """Check that at least one filename extension is non-empty.""" 

37 file_pref_or_suf = values.get("filePrefix", "") or values.get("fileSuffix", "") 

38 if not file_pref_or_suf: 

39 raise ValueError("At least one of filePrefix or fileSuffix must be set!") 

40 return values 

41 

42 def to_tuple(self) -> Tuple[str, str, str, str]: 

43 """Convert convention instance to tuple (e.g. used within CLI).""" 

44 return (self.pathPrefix, self.pathSuffix, self.filePrefix, self.fileSuffix) 

45 

46 @classmethod 

47 def from_tuple(cls, pp: str, ps: str, fp: str, fs: str): 

48 """Return new metadata file convention.""" 

49 return MetaConvention( 

50 pathPrefix=pp, pathSuffix=ps, filePrefix=fp, fileSuffix=fs 

51 ) 

52 

53 def is_meta(self, path: str) -> bool: 

54 """Check whether given path is a metadata file according to the convention.""" 

55 prts = Path(path).parts 

56 if len(prts) == 0: # root dir 

57 return False 

58 if self.filePrefix != "" and not prts[-1].startswith(self.filePrefix): 

59 return False 

60 if self.fileSuffix != "" and not prts[-1].endswith(self.fileSuffix): 

61 return False 

62 pieces = int(self.pathPrefix != "") + int(self.pathSuffix != "") 

63 if len(prts) < 1 + pieces: 

64 return False 

65 pp = self.pathPrefix == "" or prts[0] == self.pathPrefix 

66 ps = self.pathSuffix == "" or prts[-2] == self.pathSuffix 

67 return pp and ps 

68 

69 def meta_for(self, path: str, is_dir: bool = False) -> str: 

70 """Return metadata filename for provided path, based on this convention.""" 

71 ps = list(Path(path).parts) 

72 newp = [] 

73 

74 if self.pathPrefix != "": 

75 newp.append(self.pathPrefix) 

76 newp += ps[:-1] 

77 if not is_dir and self.pathSuffix != "": 

78 newp.append(self.pathSuffix) 

79 name = ps[-1] if len(ps) > 0 else "" 

80 

81 if is_dir: 

82 newp.append(name) 

83 if self.pathSuffix != "": 

84 newp.append(self.pathSuffix) 

85 metaname = self.filePrefix + self.fileSuffix 

86 newp.append(metaname) 

87 else: 

88 metaname = self.filePrefix + name + self.fileSuffix 

89 newp.append(metaname) 

90 return str(Path().joinpath(*newp)) 

91 

92 

93class PathSlice(BaseModel): 

94 """Helper class to slice into path segments and do regex-based match/substitution. 

95 

96 Invariant: into(path, sl).unslice() == path for all sl and path. 

97 """ 

98 

99 slicePre: Optional[str] 

100 sliceStr: str 

101 sliceSuf: Optional[str] 

102 

103 @classmethod 

104 def into( 

105 cls, path: str, start: Optional[int] = None, stop: Optional[int] = None 

106 ) -> PathSlice: 

107 """Slice into a path, splitting on the slashes. 

108 

109 Slice semantics is mostly like Python, except that stop=0 means 

110 "until the end", so that [0:0] means the full path. 

111 """ 

112 segs = path.split("/") 

113 pref = "/".join(segs[: start if start else 0]) 

114 inner = "/".join(segs[start : stop if stop != 0 else None]) # noqa: E203 

115 suf = "/".join(segs[stop:] if stop else []) 

116 return PathSlice( 

117 slicePre=pref if pref else None, 

118 sliceStr=inner, 

119 sliceSuf=suf if suf else None, 

120 ) 

121 

122 def unslice(self) -> str: 

123 """Inverse of slice operation (recovers complete path string).""" 

124 return "/".join([x for x in [self.slicePre, self.sliceStr, self.sliceSuf] if x]) 

125 

126 _def_pat = re.compile("(.*)") 

127 """Default pattern (match anything, put into capture group).""" 

128 

129 def match(self, pat: Optional[Union[re.Pattern, str]] = None): 

130 """Do full regex match on current slice.""" 

131 pat = pat or self._def_pat 

132 if isinstance(pat, str): 

133 pat = re.compile(pat) 

134 return pat.fullmatch(self.sliceStr) 

135 

136 def rewrite( 

137 self, pat: Optional[Union[re.Pattern, str]] = None, sub: Optional[str] = None 

138 ) -> Optional[PathSlice]: 

139 """Match and rewrite in the slice string and return a new PathSlice. 

140 

141 If no pattern given, default pattern is used. 

142 If no substitution is given, just match on pattern is performed. 

143 Returns new PathSlice with possibly rewritten slice. 

144 Returns None if match fails. 

145 Raises exception of rewriting fails due to e.g. invalid capture groups. 

146 """ 

147 if m := self.match(pat): 

148 ret = self.copy() 

149 if sub is not None: 

150 ret.sliceStr = m.expand(sub) 

151 return ret 

152 return None 

153 

154 

155class JSONSchema(BaseModel): 

156 """Helper class wrapping an arbitrary JSON Schema to be acceptable for pydantic.""" 

157 

158 @classmethod 

159 def __get_validators__(cls): # noqa: D105 

160 yield cls.validate 

161 

162 @classmethod 

163 def validate(cls, v): # noqa: D102 

164 Draft202012Validator.check_schema(v) # throws SchemaError if schema is invalid 

165 return v 

166 

167 

168class TypeEnum(Enum): 

169 """Possible values for a path type inside a dirschema rule. 

170 

171 MISSING means that the path must not exist (i.e. neither file or directory), 

172 whereas ANY means that any of these options is fine, as long as the path exists. 

173 """ 

174 

175 MISSING = False 

176 FILE = "file" 

177 DIR = "dir" 

178 ANY = True 

179 

180 def is_satisfied(self, is_file: bool, is_dir: bool) -> bool: 

181 """Check whether the flags of a path satisfy this path type.""" 

182 if self == TypeEnum.MISSING and (is_file or is_dir): 

183 return False 

184 if self == TypeEnum.ANY and not (is_file or is_dir): 

185 return False 

186 if self == TypeEnum.DIR and not is_dir: 

187 return False 

188 if self == TypeEnum.FILE and not is_file: 

189 return False 

190 return True 

191 

192 

193DEF_MATCH: Final[str] = "(.*)" 

194"""Default match regex to assume when none is set, but required by semantics.""" 

195 

196DEF_REWRITE: Final[str] = "\\1" 

197"""Default rewrite rule to assume when none is set, but required by semantics.""" 

198 

199 

200class DSRule(BaseModel): 

201 """A DirSchema rule is either a trivial (boolean) rule, or a complex object. 

202 

203 Use this class for parsing, if it is not known which of these it is. 

204 """ 

205 

206 __root__: Union[bool, Rule] 

207 

208 def __init__(self, b: Optional[bool] = None, **kwargs): 

209 """Construct wrapped boolean or object, depending on arguments.""" 

210 if b is not None: 

211 return super().__init__(__root__=b) 

212 elif "__root__" in kwargs: 

213 if len(kwargs) != 1: 

214 raise ValueError("No extra kwargs may be passed with __root__!") 

215 return super().__init__(**kwargs) 

216 else: 

217 return super().__init__(__root__=Rule(**kwargs)) 

218 

219 def __repr__(self) -> str: 

220 """Make wrapper transparent and just return repr of wrapped object.""" 

221 if isinstance(self.__root__, bool): 

222 return "true" if self.__root__ else "false" 

223 else: 

224 return repr(self.__root__) 

225 

226 def __bool__(self): 

227 """Just return value for wrapped object.""" 

228 return bool(self.__root__) 

229 

230 

231class Rule(BaseModel): 

232 """A DirSchema is a conjunction of a subset of distinct constraints/keywords.""" 

233 

234 # primitive: 

235 type: Optional[TypeEnum] = Field( 

236 description="Check that path is a file / is a dir." 

237 ) 

238 

239 valid: Optional[Union[JSONSchema, str]] = Field( 

240 description="Validate file against provided schema or validator." 

241 ) 

242 

243 # this will use the provided metadataConvention for rewriting to the right path 

244 validMeta: Optional[Union[JSONSchema, str]] = Field( 

245 description="Validate external metadata against provided schema or validator." 

246 ) 

247 

248 # these are JSON-Schema-like logical operators: 

249 

250 allOf: List[DSRule] = Field([], description="Conjunction (evaluated in order).") 

251 

252 anyOf: List[DSRule] = Field([], description="Disjunction (evaluated in order).") 

253 

254 oneOf: List[DSRule] = Field([], description="Exact-1-of-N (evaluated in order).") 

255 

256 not_: Optional[DSRule] = Field(description="Negation of a rule.", alias="not") 

257 

258 # introduced for better error reporting (will yield no error message on failure) 

259 # So this is more for dirschema "control-flow" 

260 if_: Optional[DSRule] = Field( 

261 description="Depending on result of rule, will proceed with 'then' or 'else'.", 

262 alias="if", 

263 ) 

264 

265 then: Optional[DSRule] = Field( 

266 description="Evaluated if 'if' rule exists and satisfied.", 

267 ) 

268 

269 else_: Optional[DSRule] = Field( 

270 description="Evaluated if 'if' rule exists and not satisfied.", 

271 alias="else", 

272 ) 

273 

274 # match and rewrite (path inspection and manipulation): 

275 

276 # we keep the total match and the capture groups for possible rewriting 

277 # the match data + start/end is also inherited down into children 

278 match: Optional[Pattern] = Field( 

279 description="Path must match. Sets capture groups." 

280 ) 

281 

282 # indices of path segments (i.e. array parts after splitting on /) 

283 # matchStart < matchEnd (unless start pos. and end neg.) 

284 # matchStart = -1 = match only in final segment 

285 # it's python slice without 'step' option 

286 # this means, missing segments are ignored 

287 # to have "exact" number of segments, match on a pattern with required # of / first 

288 matchStart: Optional[int] 

289 matchStop: Optional[int] 

290 

291 # only do rewrite if match was successful 

292 rewrite: Optional[str] 

293 

294 # if rewrite is set, apply 'next' to rewritten path instead of original 

295 # missing rewrite is like rewrite \1, missing match is like ".*" 

296 next: Optional[DSRule] = Field( 

297 description="If current rule is satisfied, evaluate the 'next' rule." 

298 ) 

299 

300 # improve error reporting by making it customizable 

301 # overrides all other errors on the level of this rule (but keeps subrule errors) 

302 # if set to "", will report no error message for this rule 

303 description: Optional[str] = Field( 

304 None, 

305 description="Custom error message to be shown to the user if this rule fails.", 

306 ) 

307 # used to prune noisy error message accumulation to some high level summary 

308 details: bool = Field( 

309 True, 

310 description="If set, keep errors from sub-rules, otherwise ignore them.", 

311 ) 

312 

313 # ---- 

314 

315 def __repr__(self, stream=None) -> str: 

316 """Print out the rule as YAML (only the non-default values).""" 

317 res = json.loads(self.json(exclude_defaults=True)) 

318 

319 if not stream: 

320 stream = io.StringIO() 

321 yaml.dump(res, stream) 

322 return stream.getvalue().strip() 

323 

324 yaml.dump(res, stream) 

325 return "" 

326 

327 class Config: # noqa: D106 

328 extra = Extra.forbid 

329 

330 

331Rule.update_forward_refs() 

332DSRule.update_forward_refs()