Coverage for src/dirschema/adapters.py: 94%

154 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-07 09:34 +0000

1"""Interface to perform DirSchema validation on various directory-like structures.""" 

2 

3import io 

4import itertools 

5import json 

6import zipfile as zip 

7from abc import ABC, abstractmethod 

8from pathlib import Path 

9from typing import IO, Any, Iterable, Optional, Set 

10 

11import ruamel.yaml.parser as yaml_parser 

12 

13from .json.parse import yaml 

14 

15try: 

16 import h5py 

17 import numpy 

18except ImportError: 

19 _has_h5 = False 

20else: 

21 _has_h5 = True 

22 

23 

24def _require_h5py(): 

25 """Raise exception if h5py is not installed.""" 

26 if not _has_h5: 

27 raise ImportError("Install dirschema with [h5] extra for HDF5 support!") 

28 

29 

30# NOTE: currently, completely ignores existence of symlinks 

31# the h5py visit function does ignore them too, 

32# also reasonable validation behavior is unclear. 

33# either stick to this, or find a reasonable semantics + workaround for HDF5 

34 

35 

36class IDirectory(ABC): 

37 """Abstract interface for things that behave like directories and files. 

38 

39 An adapter is intended to be instantiated mainly using `for_path`, 

40 based on a regular path in the file system. 

41 

42 Use the constructor to initialize an adapter for a more general data 

43 source (such as an object to work with an open archive file, etc.) 

44 """ 

45 

46 @abstractmethod 

47 def __init__(cls, obj: object) -> None: 

48 """Initialize an instance for a suitable directory-like object.""" 

49 

50 @classmethod 

51 @abstractmethod 

52 def for_path(cls, path: Path): 

53 """Return an instance for a path to a archive file or a directory. 

54 

55 Args: 

56 path: Path to a file or directory compatible with the adapter. 

57 """ 

58 

59 @abstractmethod 

60 def get_paths(self) -> Iterable[str]: 

61 """Return paths relative to root directory that are to be checked.""" 

62 

63 @abstractmethod 

64 def is_dir(self, path: str) -> bool: 

65 """Return whether the path is (like) a directory.""" 

66 

67 @abstractmethod 

68 def is_file(self, path: str) -> bool: 

69 """Return whether the path is (like) a file.""" 

70 

71 @abstractmethod 

72 def open_file(self, path: str) -> Optional[IO[bytes]]: 

73 """Try loading data from file at given path (to perform validation on it).""" 

74 

75 def decode_json(self, data: IO[bytes], path: str) -> Optional[Any]: 

76 """Try parsing binary data stream as JSON (to perform validation on it). 

77 

78 Second argument is the path of the opened stream. 

79 

80 Default implementation will first try parsing as JSON, then as YAML. 

81 """ 

82 try: 

83 return json.load(data) 

84 except json.JSONDecodeError: 

85 try: 

86 return yaml.load(data) 

87 except yaml_parser.ParserError: 

88 return None 

89 

90 def load_meta(self, path: str) -> Optional[Any]: 

91 """Use open_file and decode_json to load JSON metadata.""" 

92 f = self.open_file(path) 

93 return self.decode_json(f, path) if f is not None else None 

94 

95 

96class RealDir(IDirectory): 

97 """Pass-through implementation for working with actual file system.""" 

98 

99 def __init__(self, path: Path) -> None: 

100 """Initialize adapter for a plain directory path.""" 

101 self.base = path 

102 

103 @classmethod 

104 def for_path(cls, path: Path): 

105 """See [dirschema.adapters.IDirectory.for_path][].""" 

106 return cls(path) 

107 

108 def get_paths(self) -> Iterable[str]: 

109 """See [dirschema.adapters.IDirectory.get_paths][].""" 

110 paths = filter(lambda p: not p.is_symlink(), sorted(self.base.rglob("*"))) 

111 return itertools.chain( 

112 [""], map(lambda p: str(p.relative_to(self.base)), paths) 

113 ) 

114 

115 def open_file(self, path: str) -> Optional[IO[bytes]]: 

116 """See [dirschema.adapters.IDirectory.open_file][].""" 

117 try: 

118 return open(self.base / path, "rb") 

119 except (FileNotFoundError, IsADirectoryError): 

120 return None 

121 

122 def is_dir(self, dir: str) -> bool: 

123 """See [dirschema.adapters.IDirectory.is_dir][].""" 

124 return (self.base / dir).is_dir() 

125 

126 def is_file(self, dir: str) -> bool: 

127 """See [dirschema.adapters.IDirectory.is_file][].""" 

128 return (self.base / dir).is_file() 

129 

130 

131class ZipDir(IDirectory): 

132 """Adapter for working with zip files (otherwise equivalent to `RealDir`).""" 

133 

134 def __init__(self, zip_file: zip.ZipFile): 

135 """Initialize adapter for a zip file.""" 

136 self.file: zip.ZipFile = zip_file 

137 self.names: Set[str] = set(self.file.namelist()) 

138 self.names.add("/") 

139 

140 @classmethod 

141 def for_path(cls, path: Path): 

142 """See [dirschema.adapters.IDirectory.for_path][].""" 

143 opened = zip.ZipFile(path, "r") # auto-closed on GC, no need to do anything 

144 return cls(opened) 

145 

146 def get_paths(self) -> Iterable[str]: 

147 """See [dirschema.adapters.IDirectory.get_paths][].""" 

148 return itertools.chain(map(lambda s: s.rstrip("/"), sorted(self.names))) 

149 

150 def open_file(self, path: str) -> Optional[IO[bytes]]: 

151 """See [dirschema.adapters.IDirectory.open_file][].""" 

152 try: 

153 return self.file.open(path) 

154 except (KeyError, IsADirectoryError): 

155 return None 

156 

157 # as is_dir and is_file of zip.Path appear to work purely syntactically, 

158 # they're useless for us. We rather just lookup in the list of paths we need anyway 

159 

160 def is_dir(self, dir: str) -> bool: 

161 """See [dirschema.adapters.IDirectory.is_dir][].""" 

162 cand_name: str = dir.rstrip("/") + "/" 

163 return cand_name in self.names 

164 

165 def is_file(self, dir: str) -> bool: 

166 """See [dirschema.adapters.IDirectory.is_file][].""" 

167 cand_name: str = dir.rstrip("/") 

168 return cand_name in self.names 

169 

170 

171class H5Dir(IDirectory): 

172 """Adapter for working with HDF5 files. 

173 

174 Attributes do not fit nicely into the concept of just directories and files. 

175 The following conventions are used to checking attributes: 

176 

177 An attribute 'attr' of some dataset or group '/a/b' 

178 is mapped to the path '/a/b@attr' and is interpreted as a file. 

179 

180 Therefore, '@' MUST NOT be used in names of groups, datasets or attributes. 

181 

182 Only JSON is supported for the metadata, assuming that HDF5 files are usually not 

183 constructed by hand (which is the main reason for YAML support in the other cases). 

184 

185 All stored metadata entities must have a name ending with ".json" 

186 in order to distinguish them from plain strings. This is done because datasets 

187 or attributes are often used for storing simple values that could also be 

188 validated using a JSON Schema. 

189 """ 

190 

191 _ATTR_SEP = "@" 

192 """Separator used in paths to separate a HDF5 node from an attribute.""" 

193 

194 _JSON_SUF = ".json" 

195 """Suffix used in leaf nodes to distinguish strings from JSON-serialized data.""" 

196 

197 def __init__(self, hdf5_file: h5py.File) -> None: 

198 """Initialize adapter for a HDF5 file.""" 

199 self.file: h5py.File = hdf5_file 

200 

201 @classmethod 

202 def for_path(cls, dir: Path): 

203 """See [dirschema.adapters.IDirectory.for_path][].""" 

204 _require_h5py() 

205 opened = h5py.File(dir, "r") # auto-closed on GC, no need to do anything 

206 return cls(opened) 

207 

208 def get_paths(self) -> Iterable[str]: 

209 """See [dirschema.adapters.IDirectory.get_paths][].""" 

210 ret = [""] 

211 for atr in self.file["/"].attrs.keys(): 

212 ret.append(f"{self._ATTR_SEP}{atr}") 

213 

214 def collect(name: str) -> None: 

215 if name.find(self._ATTR_SEP) >= 0: 

216 raise ValueError(f"Invalid name, must not contain {self._ATTR_SEP}!") 

217 ret.append(name) 

218 for atr in self.file[name].attrs.keys(): 

219 ret.append(f"{name}{self._ATTR_SEP}{atr}") 

220 

221 self.file.visit(collect) 

222 return ret 

223 

224 def is_dir(self, path: str) -> bool: 

225 """See [dirschema.adapters.IDirectory.is_dir][].""" 

226 if path == "": 

227 return True # root directory 

228 if path.find(self._ATTR_SEP) >= 0 or path not in self.file: 

229 return False # not existing or is an attribute 

230 if isinstance(self.file[path], h5py.Group): 

231 return True # is a group 

232 return False # something that exists, but is not a group 

233 

234 def is_file(self, path: str) -> bool: 

235 """See [dirschema.adapters.IDirectory.is_file][].""" 

236 # attributes (treated like special files) exist 

237 # if underlying group/dataset exists 

238 if path.find(self._ATTR_SEP) >= 0: 

239 p = path.split(self._ATTR_SEP) 

240 p[0] = p[0] or "/" 

241 return p[0] in self.file and p[1] in self.file[p[0]].attrs 

242 else: 

243 # otherwise check it is a dataset (= "file") 

244 return path in self.file and isinstance(self.file[path], h5py.Dataset) 

245 

246 def decode_json(self, data: IO[bytes], path: str) -> Optional[Any]: 

247 """See [dirschema.adapters.IDirectory.decode_json][].""" 

248 bs = data.read() 

249 try: 

250 ret = json.loads(bs) 

251 except json.JSONDecodeError: 

252 return None 

253 

254 if isinstance(ret, dict) and not path.endswith(self._JSON_SUF): 

255 return bs 

256 else: 

257 return ret 

258 

259 def open_file(self, path: str) -> Optional[IO[bytes]]: 

260 """See [dirschema.adapters.IDirectory.open_file][].""" 

261 p = path 

262 if p.find(self._ATTR_SEP) >= 0: 

263 # try treating as attribute, return data if it is a string 

264 f, s = p.split(self._ATTR_SEP) 

265 f = f or "/" 

266 if f in self.file and s in self.file[f].attrs: 

267 dat = self.file[f].attrs[s] 

268 if isinstance(dat, h5py.Empty): 

269 return None 

270 if isinstance(dat, str): 

271 if not path.endswith(self._JSON_SUF): 

272 dat = f'"{dat}"' # JSON-encoded string 

273 else: 

274 dat = json.dumps(dat.tolist()) 

275 return io.BytesIO(dat.encode("utf-8")) 

276 else: 

277 return None 

278 

279 # check that the path exists and is a dataset, but not a numpy array 

280 if p not in self.file: 

281 return None 

282 

283 dat = self.file[p] 

284 if not isinstance(dat, h5py.Dataset): 

285 return None 

286 

287 bs: Any = dat[()] 

288 if isinstance(bs, numpy.ndarray): 

289 return None 

290 

291 # the only kinds of datasets we accept are essentially utf-8 strings 

292 # which are represented as possibly wrapped bytes 

293 if isinstance(bs, numpy.void): # void-wrapped bytes -> unpack 

294 bs = bs.tobytes() 

295 

296 return io.BytesIO(bs) 

297 

298 

299def get_adapter_for(path: Path) -> IDirectory: 

300 """Return suitable interface adapter based on path and file extension. 

301 

302 Args: 

303 path: Path to directory or archive file 

304 

305 Returns: 

306 An adapter instance that can be used for dirschema validation. 

307 

308 Raises: 

309 ValueError: If no suitable adapter was found for the path. 

310 """ 

311 if path.is_dir(): 

312 return RealDir.for_path(path) 

313 

314 if path.is_file(): 

315 if path.name.endswith("zip"): 

316 return ZipDir.for_path(path) 

317 elif path.name.endswith(("h5", "hdf5")): 

318 _require_h5py() 

319 return H5Dir.for_path(path) 

320 

321 raise ValueError(f"Found no suitable dirschema adapter for path: {path}")