Coverage for src/dirschema/core.py: 97%
148 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-07 09:34 +0000
1"""Core types of dirschema."""
3from __future__ import annotations
5import io
6import json
7import re
8from enum import Enum
9from pathlib import Path
10from typing import List, Optional, Pattern, Tuple, Union
12from jsonschema import Draft202012Validator
13from pydantic import BaseModel, Extra, Field, root_validator
14from ruamel.yaml import YAML
15from typing_extensions import Final
17yaml = YAML(typ="safe")
20class MetaConvention(BaseModel):
21 """Filename convention for metadata files that are associated with other entities.
23 It defines where to look for metadata for files that are not themselves known
24 as json, or metadata concerning directories.
26 At the same time, these files are ignored by themselves and act as "sidecar" files.
27 """
29 pathPrefix: str = ""
30 pathSuffix: str = ""
31 filePrefix: str = ""
32 fileSuffix: str = "_meta.json"
34 @root_validator
35 def check_valid(cls, values):
36 """Check that at least one filename extension is non-empty."""
37 file_pref_or_suf = values.get("filePrefix", "") or values.get("fileSuffix", "")
38 if not file_pref_or_suf:
39 raise ValueError("At least one of filePrefix or fileSuffix must be set!")
40 return values
42 def to_tuple(self) -> Tuple[str, str, str, str]:
43 """Convert convention instance to tuple (e.g. used within CLI)."""
44 return (self.pathPrefix, self.pathSuffix, self.filePrefix, self.fileSuffix)
46 @classmethod
47 def from_tuple(cls, pp: str, ps: str, fp: str, fs: str):
48 """Return new metadata file convention."""
49 return MetaConvention(
50 pathPrefix=pp, pathSuffix=ps, filePrefix=fp, fileSuffix=fs
51 )
53 def is_meta(self, path: str) -> bool:
54 """Check whether given path is a metadata file according to the convention."""
55 prts = Path(path).parts
56 if len(prts) == 0: # root dir
57 return False
58 if self.filePrefix != "" and not prts[-1].startswith(self.filePrefix):
59 return False
60 if self.fileSuffix != "" and not prts[-1].endswith(self.fileSuffix):
61 return False
62 pieces = int(self.pathPrefix != "") + int(self.pathSuffix != "")
63 if len(prts) < 1 + pieces:
64 return False
65 pp = self.pathPrefix == "" or prts[0] == self.pathPrefix
66 ps = self.pathSuffix == "" or prts[-2] == self.pathSuffix
67 return pp and ps
69 def meta_for(self, path: str, is_dir: bool = False) -> str:
70 """Return metadata filename for provided path, based on this convention."""
71 ps = list(Path(path).parts)
72 newp = []
74 if self.pathPrefix != "":
75 newp.append(self.pathPrefix)
76 newp += ps[:-1]
77 if not is_dir and self.pathSuffix != "":
78 newp.append(self.pathSuffix)
79 name = ps[-1] if len(ps) > 0 else ""
81 if is_dir:
82 newp.append(name)
83 if self.pathSuffix != "":
84 newp.append(self.pathSuffix)
85 metaname = self.filePrefix + self.fileSuffix
86 newp.append(metaname)
87 else:
88 metaname = self.filePrefix + name + self.fileSuffix
89 newp.append(metaname)
90 return str(Path().joinpath(*newp))
93class PathSlice(BaseModel):
94 """Helper class to slice into path segments and do regex-based match/substitution.
96 Invariant: into(path, sl).unslice() == path for all sl and path.
97 """
99 slicePre: Optional[str]
100 sliceStr: str
101 sliceSuf: Optional[str]
103 @classmethod
104 def into(
105 cls, path: str, start: Optional[int] = None, stop: Optional[int] = None
106 ) -> PathSlice:
107 """Slice into a path, splitting on the slashes.
109 Slice semantics is mostly like Python, except that stop=0 means
110 "until the end", so that [0:0] means the full path.
111 """
112 segs = path.split("/")
113 pref = "/".join(segs[: start if start else 0])
114 inner = "/".join(segs[start : stop if stop != 0 else None]) # noqa: E203
115 suf = "/".join(segs[stop:] if stop else [])
116 return PathSlice(
117 slicePre=pref if pref else None,
118 sliceStr=inner,
119 sliceSuf=suf if suf else None,
120 )
122 def unslice(self) -> str:
123 """Inverse of slice operation (recovers complete path string)."""
124 return "/".join([x for x in [self.slicePre, self.sliceStr, self.sliceSuf] if x])
126 _def_pat = re.compile("(.*)")
127 """Default pattern (match anything, put into capture group)."""
129 def match(self, pat: Optional[Union[re.Pattern, str]] = None):
130 """Do full regex match on current slice."""
131 pat = pat or self._def_pat
132 if isinstance(pat, str):
133 pat = re.compile(pat)
134 return pat.fullmatch(self.sliceStr)
136 def rewrite(
137 self, pat: Optional[Union[re.Pattern, str]] = None, sub: Optional[str] = None
138 ) -> Optional[PathSlice]:
139 """Match and rewrite in the slice string and return a new PathSlice.
141 If no pattern given, default pattern is used.
142 If no substitution is given, just match on pattern is performed.
143 Returns new PathSlice with possibly rewritten slice.
144 Returns None if match fails.
145 Raises exception of rewriting fails due to e.g. invalid capture groups.
146 """
147 if m := self.match(pat):
148 ret = self.copy()
149 if sub is not None:
150 ret.sliceStr = m.expand(sub)
151 return ret
152 return None
155class JSONSchema(BaseModel):
156 """Helper class wrapping an arbitrary JSON Schema to be acceptable for pydantic."""
158 @classmethod
159 def __get_validators__(cls): # noqa: D105
160 yield cls.validate
162 @classmethod
163 def validate(cls, v): # noqa: D102
164 Draft202012Validator.check_schema(v) # throws SchemaError if schema is invalid
165 return v
168class TypeEnum(Enum):
169 """Possible values for a path type inside a dirschema rule.
171 MISSING means that the path must not exist (i.e. neither file or directory),
172 whereas ANY means that any of these options is fine, as long as the path exists.
173 """
175 MISSING = False
176 FILE = "file"
177 DIR = "dir"
178 ANY = True
180 def is_satisfied(self, is_file: bool, is_dir: bool) -> bool:
181 """Check whether the flags of a path satisfy this path type."""
182 if self == TypeEnum.MISSING and (is_file or is_dir):
183 return False
184 if self == TypeEnum.ANY and not (is_file or is_dir):
185 return False
186 if self == TypeEnum.DIR and not is_dir:
187 return False
188 if self == TypeEnum.FILE and not is_file:
189 return False
190 return True
193DEF_MATCH: Final[str] = "(.*)"
194"""Default match regex to assume when none is set, but required by semantics."""
196DEF_REWRITE: Final[str] = "\\1"
197"""Default rewrite rule to assume when none is set, but required by semantics."""
200class DSRule(BaseModel):
201 """A DirSchema rule is either a trivial (boolean) rule, or a complex object.
203 Use this class for parsing, if it is not known which of these it is.
204 """
206 __root__: Union[bool, Rule]
208 def __init__(self, b: Optional[bool] = None, **kwargs):
209 """Construct wrapped boolean or object, depending on arguments."""
210 if b is not None:
211 return super().__init__(__root__=b)
212 elif "__root__" in kwargs:
213 if len(kwargs) != 1:
214 raise ValueError("No extra kwargs may be passed with __root__!")
215 return super().__init__(**kwargs)
216 else:
217 return super().__init__(__root__=Rule(**kwargs))
219 def __repr__(self) -> str:
220 """Make wrapper transparent and just return repr of wrapped object."""
221 if isinstance(self.__root__, bool):
222 return "true" if self.__root__ else "false"
223 else:
224 return repr(self.__root__)
226 def __bool__(self):
227 """Just return value for wrapped object."""
228 return bool(self.__root__)
231class Rule(BaseModel):
232 """A DirSchema is a conjunction of a subset of distinct constraints/keywords."""
234 # primitive:
235 type: Optional[TypeEnum] = Field(
236 description="Check that path is a file / is a dir."
237 )
239 valid: Optional[Union[JSONSchema, str]] = Field(
240 description="Validate file against provided schema or validator."
241 )
243 # this will use the provided metadataConvention for rewriting to the right path
244 validMeta: Optional[Union[JSONSchema, str]] = Field(
245 description="Validate external metadata against provided schema or validator."
246 )
248 # these are JSON-Schema-like logical operators:
250 allOf: List[DSRule] = Field([], description="Conjunction (evaluated in order).")
252 anyOf: List[DSRule] = Field([], description="Disjunction (evaluated in order).")
254 oneOf: List[DSRule] = Field([], description="Exact-1-of-N (evaluated in order).")
256 not_: Optional[DSRule] = Field(description="Negation of a rule.", alias="not")
258 # introduced for better error reporting (will yield no error message on failure)
259 # So this is more for dirschema "control-flow"
260 if_: Optional[DSRule] = Field(
261 description="Depending on result of rule, will proceed with 'then' or 'else'.",
262 alias="if",
263 )
265 then: Optional[DSRule] = Field(
266 description="Evaluated if 'if' rule exists and satisfied.",
267 )
269 else_: Optional[DSRule] = Field(
270 description="Evaluated if 'if' rule exists and not satisfied.",
271 alias="else",
272 )
274 # match and rewrite (path inspection and manipulation):
276 # we keep the total match and the capture groups for possible rewriting
277 # the match data + start/end is also inherited down into children
278 match: Optional[Pattern] = Field(
279 description="Path must match. Sets capture groups."
280 )
282 # indices of path segments (i.e. array parts after splitting on /)
283 # matchStart < matchEnd (unless start pos. and end neg.)
284 # matchStart = -1 = match only in final segment
285 # it's python slice without 'step' option
286 # this means, missing segments are ignored
287 # to have "exact" number of segments, match on a pattern with required # of / first
288 matchStart: Optional[int]
289 matchStop: Optional[int]
291 # only do rewrite if match was successful
292 rewrite: Optional[str]
294 # if rewrite is set, apply 'next' to rewritten path instead of original
295 # missing rewrite is like rewrite \1, missing match is like ".*"
296 next: Optional[DSRule] = Field(
297 description="If current rule is satisfied, evaluate the 'next' rule."
298 )
300 # improve error reporting by making it customizable
301 # overrides all other errors on the level of this rule (but keeps subrule errors)
302 # if set to "", will report no error message for this rule
303 description: Optional[str] = Field(
304 None,
305 description="Custom error message to be shown to the user if this rule fails.",
306 )
307 # used to prune noisy error message accumulation to some high level summary
308 details: bool = Field(
309 True,
310 description="If set, keep errors from sub-rules, otherwise ignore them.",
311 )
313 # ----
315 def __repr__(self, stream=None) -> str:
316 """Print out the rule as YAML (only the non-default values)."""
317 res = json.loads(self.json(exclude_defaults=True))
319 if not stream:
320 stream = io.StringIO()
321 yaml.dump(res, stream)
322 return stream.getvalue().strip()
324 yaml.dump(res, stream)
325 return ""
327 class Config: # noqa: D106
328 extra = Extra.forbid
331Rule.update_forward_refs()
332DSRule.update_forward_refs()