Coverage for src/dirschema/adapters.py: 94%
154 statements
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-08 15:24 +0000
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-08 15:24 +0000
1"""Interface to perform DirSchema validation on various directory-like structures."""
3import io
4import itertools
5import json
6import zipfile as zip
7from abc import ABC, abstractmethod
8from pathlib import Path
9from typing import IO, Any, Iterable, Optional, Set
11import ruamel.yaml.parser as yaml_parser
13from .json.parse import yaml
15try:
16 import h5py
17 import numpy
18except ImportError:
19 _has_h5 = False
20else:
21 _has_h5 = True
24def _require_h5py():
25 """Raise exception if h5py is not installed."""
26 if not _has_h5:
27 raise ImportError("Install dirschema with [h5] extra for HDF5 support!")
30# NOTE: currently, completely ignores existence of symlinks
31# the h5py visit function does ignore them too,
32# also reasonable validation behavior is unclear.
33# either stick to this, or find a reasonable semantics + workaround for HDF5
36class IDirectory(ABC):
37 """Abstract interface for things that behave like directories and files.
39 An adapter is intended to be instantiated mainly using `for_path`,
40 based on a regular path in the file system.
42 Use the constructor to initialize an adapter for a more general data
43 source (such as an object to work with an open archive file, etc.)
44 """
46 @abstractmethod
47 def __init__(cls, obj: object) -> None:
48 """Initialized an instance for a suitable directory-like object."""
50 @classmethod
51 @abstractmethod
52 def for_path(cls, path: Path):
53 """Return an instance for a path to a archive file or a directory.
55 Args:
56 path: Path to a file or directory compatible with the adapter.
57 """
59 @abstractmethod
60 def get_paths(self) -> Iterable[str]:
61 """Return paths relative to root directory that are to be checked."""
63 @abstractmethod
64 def is_dir(self, path: str) -> bool:
65 """Return whether the path is (like) a directory."""
67 @abstractmethod
68 def is_file(self, path: str) -> bool:
69 """Return whether the path is (like) a file."""
71 @abstractmethod
72 def open_file(self, path: str) -> Optional[IO[bytes]]:
73 """Try loading data from file at given path (to perform validation on it)."""
75 def decode_json(self, data: IO[bytes], path: str) -> Optional[Any]:
76 """Try parsing binary data stream as JSON (to perform validation on it).
78 Second argument is the path of the opened stream.
80 Default implementation will first try parsing as JSON, then as YAML.
81 """
82 try:
83 return json.load(data)
84 except json.JSONDecodeError:
85 try:
86 return yaml.load(data)
87 except yaml_parser.ParserError:
88 return None
90 def load_meta(self, path: str) -> Optional[Any]:
91 """Use open_file and decode_json to load JSON metadata."""
92 f = self.open_file(path)
93 return self.decode_json(f, path) if f is not None else None
96class RealDir(IDirectory):
97 """Pass-through implementation for working with actual file system."""
99 def __init__(self, path: Path) -> None:
100 """Initialize adapter for a plain directory path."""
101 self.base = path
103 @classmethod
104 def for_path(cls, path: Path):
105 """See [dirschema.adapters.IDirectory.for_path][]."""
106 return cls(path)
108 def get_paths(self) -> Iterable[str]:
109 """See [dirschema.adapters.IDirectory.get_paths][]."""
110 paths = filter(lambda p: not p.is_symlink(), sorted(self.base.rglob("*")))
111 return itertools.chain(
112 [""], map(lambda p: str(p.relative_to(self.base)), paths)
113 )
115 def open_file(self, path: str) -> Optional[IO[bytes]]:
116 """See [dirschema.adapters.IDirectory.open_file][]."""
117 try:
118 return open(self.base / path, "rb")
119 except (FileNotFoundError, IsADirectoryError):
120 return None
122 def is_dir(self, dir: str) -> bool:
123 """See [dirschema.adapters.IDirectory.is_dir][]."""
124 return (self.base / dir).is_dir()
126 def is_file(self, dir: str) -> bool:
127 """See [dirschema.adapters.IDirectory.is_file][]."""
128 return (self.base / dir).is_file()
131class ZipDir(IDirectory):
132 """Adapter for working with zip files (otherwise equivalent to `RealDir`)."""
134 def __init__(self, zip_file: zip.ZipFile):
135 """Initialize adapter for a zip file."""
136 self.file: zip.ZipFile = zip_file
137 self.names: Set[str] = set(self.file.namelist())
138 self.names.add("/")
140 @classmethod
141 def for_path(cls, path: Path):
142 """See [dirschema.adapters.IDirectory.for_path][]."""
143 opened = zip.ZipFile(path, "r") # auto-closed on GC, no need to do anything
144 return cls(opened)
146 def get_paths(self) -> Iterable[str]:
147 """See [dirschema.adapters.IDirectory.get_paths][]."""
148 return itertools.chain(map(lambda s: s.rstrip("/"), sorted(self.names)))
150 def open_file(self, path: str) -> Optional[IO[bytes]]:
151 """See [dirschema.adapters.IDirectory.open_file][]."""
152 try:
153 return self.file.open(path)
154 except (KeyError, IsADirectoryError):
155 return None
157 # as is_dir and is_file of zip.Path appear to work purely syntactically,
158 # they're useless for us. We rather just lookup in the list of paths we need anyway
160 def is_dir(self, dir: str) -> bool:
161 """See [dirschema.adapters.IDirectory.is_dir][]."""
162 cand_name: str = dir.rstrip("/") + "/"
163 return cand_name in self.names
165 def is_file(self, dir: str) -> bool:
166 """See [dirschema.adapters.IDirectory.is_file][]."""
167 cand_name: str = dir.rstrip("/")
168 return cand_name in self.names
171class H5Dir(IDirectory):
172 """Adapter for working with HDF5 files.
174 Attributes do not fit nicely into the concept of just directories and files.
175 The following conventions are used to checking attributes:
177 An attribute 'attr' of some dataset or group '/a/b'
178 is mapped to the path '/a/b@attr' and is interpreted as a file.
180 Therefore, '@' MUST NOT be used in names of groups, datasets or attributes.
182 Only JSON is supported for the metadata, assuming that HDF5 files are usually not
183 constructed by hand (which is the main reason for YAML support in the other cases).
185 All stored metadata entities must have a name ending with ".json"
186 in order to distinguish them from plain strings. This is done because datasets
187 or attributes are often used for storing simple values that could also be
188 validated using a JSON Schema.
189 """
191 _ATTR_SEP = "@"
192 """Separator used in paths to separate a HDF5 node from an attribute."""
194 _JSON_SUF = ".json"
195 """Suffix used in leaf nodes to distinguish strings from JSON-serialized data."""
197 def __init__(self, hdf5_file: h5py.File) -> None:
198 """Initialize adapter for a HDF5 file."""
199 self.file: h5py.File = hdf5_file
201 @classmethod
202 def for_path(cls, dir: Path):
203 """See [dirschema.adapters.IDirectory.for_path][]."""
204 _require_h5py()
205 opened = h5py.File(dir, "r") # auto-closed on GC, no need to do anything
206 return cls(opened)
208 def get_paths(self) -> Iterable[str]:
209 """See [dirschema.adapters.IDirectory.get_paths][]."""
210 ret = [""]
211 for atr in self.file["/"].attrs.keys():
212 ret.append(f"{self._ATTR_SEP}{atr}")
214 def collect(name: str) -> None:
215 if name.find(self._ATTR_SEP) >= 0:
216 raise ValueError(f"Invalid name, must not contain {self._ATTR_SEP}!")
217 ret.append(name)
218 for atr in self.file[name].attrs.keys():
219 ret.append(f"{name}{self._ATTR_SEP}{atr}")
221 self.file.visit(collect)
222 return ret
224 def is_dir(self, path: str) -> bool:
225 """See [dirschema.adapters.IDirectory.is_dir][]."""
226 if path == "":
227 return True # root directory
228 if path.find(self._ATTR_SEP) >= 0 or path not in self.file:
229 return False # not existing or is an attribute
230 if isinstance(self.file[path], h5py.Group):
231 return True # is a group
232 return False # something that exists, but is not a group
234 def is_file(self, path: str) -> bool:
235 """See [dirschema.adapters.IDirectory.is_file][]."""
236 # attributes (treated like special files) exist
237 # if underlying group/dataset exists
238 if path.find(self._ATTR_SEP) >= 0:
239 p = path.split(self._ATTR_SEP)
240 p[0] = p[0] or "/"
241 return p[0] in self.file and p[1] in self.file[p[0]].attrs
242 else:
243 # otherwise check it is a dataset (= "file")
244 return path in self.file and isinstance(self.file[path], h5py.Dataset)
246 def decode_json(self, data: IO[bytes], path: str) -> Optional[Any]:
247 """See [dirschema.adapters.IDirectory.decode_json][]."""
248 bs = data.read()
249 try:
250 ret = json.loads(bs)
251 except json.JSONDecodeError:
252 return None
254 if isinstance(ret, dict) and not path.endswith(self._JSON_SUF):
255 return bs
256 else:
257 return ret
259 def open_file(self, path: str) -> Optional[IO[bytes]]:
260 """See [dirschema.adapters.IDirectory.open_file][]."""
261 p = path
262 if p.find(self._ATTR_SEP) >= 0:
263 # try treating as attribute, return data if it is a string
264 f, s = p.split(self._ATTR_SEP)
265 f = f or "/"
266 if f in self.file and s in self.file[f].attrs:
267 dat = self.file[f].attrs[s]
268 if isinstance(dat, h5py.Empty):
269 return None
270 if isinstance(dat, str):
271 if not path.endswith(self._JSON_SUF):
272 dat = f'"{dat}"' # JSON-encoded string
273 else:
274 dat = json.dumps(dat.tolist())
275 return io.BytesIO(dat.encode("utf-8"))
276 else:
277 return None
279 # check that the path exists and is a dataset, but not a numpy array
280 if p not in self.file:
281 return None
283 dat = self.file[p]
284 if not isinstance(dat, h5py.Dataset):
285 return None
287 bs: Any = dat[()]
288 if isinstance(bs, numpy.ndarray):
289 return None
291 # the only kinds of datasets we accept are essentially utf-8 strings
292 # which are represented as possibly wrapped bytes
293 if isinstance(bs, numpy.void): # void-wrapped bytes -> unpack
294 bs = bs.tobytes()
296 return io.BytesIO(bs)
299def get_adapter_for(path: Path) -> IDirectory:
300 """Return suitable interface adapter based on path and file extension.
302 Args:
303 path: Path to directory or archive file
305 Returns:
306 An adapter instance that can be used for dirschema validation.
308 Raises:
309 ValueError: If no suitable adapter was found for the path.
310 """
311 if path.is_dir():
312 return RealDir.for_path(path)
314 if path.is_file():
315 if path.name.endswith("zip"):
316 return ZipDir.for_path(path)
317 elif path.name.endswith(("h5", "hdf5")):
318 _require_h5py()
319 return H5Dir.for_path(path)
321 raise ValueError(f"Found no suitable dirschema adapter for path: {path}")