Coverage for src/metador_core/container/wrappers.py: 91%

288 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 09:33 +0000

1from __future__ import annotations 

2 

3from itertools import takewhile 

4from typing import Any, Dict, MutableMapping, Optional, Set, Type, Union 

5 

6import h5py 

7import wrapt 

8 

9from ..util.types import H5DatasetLike, H5FileLike, H5GroupLike, H5NodeLike, OpenMode 

10from . import utils as M 

11from .drivers import MetadorDriver, to_h5filelike 

12from .interface import MetadorContainerTOC, MetadorMeta, NodeAcl, NodeAclFlags 

13 

14 

15class UnsupportedOperationError(AttributeError): 

16 """Subclass to distinguish between actually missing attribute and unsupported one.""" 

17 

18 

19class WrappedAttributeManager(wrapt.ObjectProxy): 

20 """Wrapper for AttributeManager-like objects to prevent mutation (read-only) or inspection (skel-only).""" 

21 

22 __wrapped__: MutableMapping 

23 

24 _self_acl: NodeAclFlags 

25 _self_acl_whitelist: Dict[NodeAcl, Set[str]] = { 

26 NodeAcl.skel_only: {"keys"}, 

27 NodeAcl.read_only: {"keys", "values", "items", "get"}, 

28 } 

29 _self_allowed: Set[str] 

30 

31 def __init__(self, obj, acl: NodeAclFlags): 

32 super().__init__(obj) 

33 # initialize whitelist based on passed ACL flags 

34 self._self_acl = acl 

35 allowed: Optional[Set[str]] = None 

36 for flag, value in acl.items(): 

37 if not value or flag not in self._self_acl_whitelist: 

38 continue 

39 if allowed is None: 

40 allowed = self._self_acl_whitelist[flag] 

41 else: 

42 allowed = allowed.intersection(self._self_acl_whitelist[flag]) 

43 self._self_allowed = allowed or set() 

44 

45 def _raise_illegal_op(self, flag_info: str): 

46 raise UnsupportedOperationError( 

47 f"This attribute set belongs to a node marked as {flag_info}!" 

48 ) 

49 

50 def __getattr__(self, key: str): 

51 # NOTE: this will not restrict __contains__ because its a special method 

52 # (which is desired behavior). 

53 if ( 

54 hasattr(self.__wrapped__, key) 

55 and self._self_allowed 

56 and key not in self._self_allowed 

57 ): 

58 self._raise_illegal_op(str(self._self_acl)) 

59 return getattr(self.__wrapped__, key) 

60 

61 # this is not intercepted by getattr 

62 def __getitem__(self, key: str): 

63 if self._self_acl[NodeAcl.skel_only]: 

64 self._raise_illegal_op(NodeAcl.skel_only.name) 

65 return self.__wrapped__.__getitem__(key) 

66 

67 # this is not intercepted by getattr 

68 def __setitem__(self, key: str, value): 

69 if self._self_acl[NodeAcl.read_only]: 

70 self._raise_illegal_op(NodeAcl.read_only.name) 

71 return self.__wrapped__.__setitem__(key, value) 

72 

73 # this is not intercepted by getattr 

74 def __delitem__(self, key: str): 

75 if self._self_acl[NodeAcl.read_only]: 

76 self._raise_illegal_op(NodeAcl.read_only.name) 

77 return self.__wrapped__.__delitem__(key) 

78 

79 def __repr__(self) -> str: 

80 return repr(self.__wrapped__) 

81 

82 

83class WithDefaultQueryStartNode(wrapt.ObjectProxy): 

84 """Cosmetic wrapper to search metadata below a H5LikeGroup. 

85 

86 Used to make sure that: 

87 `group.metador.query(...)` 

88 is equivalent to: 

89 `container.metador.query(..., node=group)` 

90 

91 (without it, the default node will be the root "/" if not specified). 

92 """ 

93 

94 __wrapped__: MetadorContainerTOC 

95 

96 def __init__(self, obj: MetadorContainerTOC, def_start_node): 

97 super().__init__(obj) 

98 self._self_query_start_node = def_start_node 

99 

100 def query(self, schema, version=None, *, node=None): 

101 node = node or self._self_query_start_node 

102 return self.__wrapped__.query(schema, version, node=node) 

103 

104 

105class MetadorNode(wrapt.ObjectProxy): 

106 """Wrapper for h5py and IH5 Groups and Datasets providing Metador-specific features. 

107 

108 In addition to the Metadata management, also provides helpers to reduce possible 

109 mistakes in implementing interfaces by allowing to mark nodes as 

110 

111 * read_only (regardless of the writability of the underlying opened container) and 

112 * local_only (preventing access to (meta)data above this node) 

113 

114 Note that these are "soft" restrictions to prevent errors and can be bypassed. 

115 """ 

116 

117 __wrapped__: H5NodeLike 

118 

119 @staticmethod 

120 def _parse_access_flags(kwargs) -> NodeAclFlags: 

121 # NOTE: mutating kwargs, removes keys that are inspected! 

122 return {flag: kwargs.pop(flag.name, False) for flag in iter(NodeAcl)} 

123 

124 def __init__(self, mc: MetadorContainer, node: H5NodeLike, **kwargs): 

125 flags = self._parse_access_flags(kwargs) 

126 lp = kwargs.pop("local_parent", None) 

127 if kwargs: 

128 raise ValueError(f"Unknown keyword arguments: {kwargs}") 

129 

130 super().__init__(node) 

131 self._self_container: MetadorContainer = mc 

132 

133 self._self_flags: NodeAclFlags = flags 

134 self._self_local_parent: Optional[MetadorGroup] = lp 

135 

136 def _child_node_kwargs(self): 

137 """Return kwargs to be passed to a child node. 

138 

139 Ensures that {read,skel,local}_only status is passed down correctly. 

140 """ 

141 return { 

142 "local_parent": self if self.acl[NodeAcl.local_only] else None, 

143 **{k.name: v for k, v in self.acl.items() if v}, 

144 } 

145 

146 def restrict(self, **kwargs) -> MetadorNode: 

147 """Restrict this object to be local_only or read_only. 

148 

149 Pass local_only=True and/or read_only=True to enable the restriction. 

150 

151 local_only means that the node may not access the parent or file objects. 

152 read_only means that mutable actions cannot be done (even if container is mutable). 

153 """ 

154 added_flags = self._parse_access_flags(kwargs) 

155 if added_flags[NodeAcl.local_only]: 

156 # was set as local explicitly for this node -> 

157 self._self_local_parent = None # remove its ability to go up 

158 

159 # can only set, but not unset! 

160 self._self_flags.update({k: True for k, v in added_flags.items() if v}) 

161 if kwargs: 

162 raise ValueError(f"Unknown keyword arguments: {kwargs}") 

163 

164 return self 

165 

166 @property 

167 def acl(self) -> Dict[NodeAcl, bool]: 

168 """Return ACL flags of current node.""" 

169 return dict(self._self_flags) 

170 

171 def _guard_path(self, path: str): 

172 if M.is_internal_path(path): 

173 msg = f"Trying to use a Metador-internal path: '{path}'" 

174 raise ValueError(msg) 

175 if self.acl[NodeAcl.local_only] and path[0] == "/": 

176 msg = f"Node is marked as local_only, cannot use absolute path '{path}'!" 

177 raise ValueError(msg) 

178 

179 def _guard_acl(self, flag: NodeAcl, method: str = "this method"): 

180 if self.acl[flag]: 

181 msg = f"Cannot use {method}, the node is marked as {flag.name}!" 

182 raise UnsupportedOperationError(msg) 

183 

184 # helpers 

185 

186 def _wrap_if_node(self, val): 

187 """Wrap value into a metador node wrapper, if it is a suitable group or dataset.""" 

188 if isinstance(val, H5GroupLike): 

189 return MetadorGroup(self._self_container, val, **self._child_node_kwargs()) 

190 elif isinstance(val, H5DatasetLike): 

191 return MetadorDataset( 

192 self._self_container, val, **self._child_node_kwargs() 

193 ) 

194 else: 

195 return val 

196 

197 def _destroy_meta(self, _unlink: bool = True): 

198 """Destroy all attached metadata at and below this node.""" 

199 self.meta._destroy(_unlink=_unlink) 

200 

201 # need that to add our new methods 

202 

203 def __dir__(self): 

204 names = set.union( 

205 *map( 

206 lambda x: set(x.__dict__.keys()), 

207 takewhile(lambda x: issubclass(x, MetadorNode), type(self).mro()), 

208 ) 

209 ) 

210 return list(set(super().__dir__()).union(names)) 

211 

212 # make wrapper transparent 

213 

214 def __repr__(self): 

215 return repr(self.__wrapped__) 

216 

217 # added features 

218 

219 @property 

220 def meta(self) -> MetadorMeta: 

221 """Access the interface to metadata attached to this node.""" 

222 return MetadorMeta(self) 

223 

224 @property 

225 def metador(self) -> MetadorContainerTOC: 

226 """Access the info about the container this node belongs to.""" 

227 return WithDefaultQueryStartNode(self._self_container.metador, self) 

228 

229 # wrap existing methods as needed 

230 

231 @property 

232 def name(self) -> str: 

233 return self.__wrapped__.name # just for type checker not to complain 

234 

235 @property 

236 def attrs(self): 

237 if self.acl[NodeAcl.read_only] or self.acl[NodeAcl.skel_only]: 

238 return WrappedAttributeManager(self.__wrapped__.attrs, self.acl) 

239 return self.__wrapped__.attrs 

240 

241 @property 

242 def parent(self) -> MetadorGroup: 

243 if self.acl[NodeAcl.local_only]: 

244 # allow child nodes of local-only nodes to go up to the marked parent 

245 # (or it is None, if this is the local root) 

246 if lp := self._self_local_parent: 

247 return lp 

248 else: 

249 # raise exception (illegal non-local access) 

250 self._guard_acl(NodeAcl.local_only, "parent") 

251 

252 return MetadorGroup( 

253 self._self_container, 

254 self.__wrapped__.parent, 

255 **self._child_node_kwargs(), 

256 ) 

257 

258 @property 

259 def file(self) -> MetadorContainer: 

260 if self.acl[NodeAcl.local_only]: 

261 # raise exception (illegal non-local access) 

262 self._guard_acl(NodeAcl.local_only, "parent") 

263 return self._self_container 

264 

265 

266class MetadorDataset(MetadorNode): 

267 """Metador wrapper for a HDF5 Dataset.""" 

268 

269 __wrapped__: H5DatasetLike 

270 

271 # manually assembled from public methods which h5py.Dataset provides 

272 _self_RO_FORBIDDEN = {"resize", "make_scale", "write_direct", "flush"} 

273 

274 def __getattr__(self, key): 

275 if self.acl[NodeAcl.read_only] and key in self._self_RO_FORBIDDEN: 

276 self._guard_acl(NodeAcl.read_only, key) 

277 if self.acl[NodeAcl.skel_only] and key == "get": 

278 self._guard_acl(NodeAcl.skel_only, key) 

279 

280 return getattr(self.__wrapped__, key) 

281 

282 # prevent getter of node if marked as skel_only 

283 def __getitem__(self, *args, **kwargs): 

284 self._guard_acl(NodeAcl.skel_only, "__getitem__") 

285 return self.__wrapped__.__getitem__(*args, **kwargs) 

286 

287 # prevent mutating method calls of node is marked as read_only 

288 

289 def __setitem__(self, *args, **kwargs): 

290 self._guard_acl(NodeAcl.read_only, "__setitem__") 

291 return self.__wrapped__.__setitem__(*args, **kwargs) 

292 

293 

294# TODO: can this be done somehow with wrapt.decorator but still without boilerplate? 

295# problem is it wants a function, but we need to look it up by name first 

296# so we hand-roll the decorator for now. 

297def _wrap_method(method: str, is_read_only_method: bool = False): 

298 """Wrap a method called on a HDF5 entity. 

299 

300 Prevents user from creating or deleting reserved entities/names by hand. 

301 Ensures that a wrapped Group/Dataset is returned. 

302 

303 If is_read_only=False and the object is read_only, refuses to call the method. 

304 """ 

305 

306 def wrapped_method(obj, name, *args, **kwargs): 

307 # obj will be the self of the wrapper instance 

308 obj._guard_path(name) 

309 if not is_read_only_method: 

310 obj._guard_acl(NodeAcl.read_only, method) 

311 ret = getattr(obj.__wrapped__, method)(name, *args, **kwargs) # RAW 

312 return obj._wrap_if_node(ret) 

313 

314 return wrapped_method 

315 

316 

317# classes of h5py reference-like types (we don't support that) 

318_H5_REF_TYPES = [h5py.HardLink, h5py.SoftLink, h5py.ExternalLink, h5py.h5r.Reference] 

319 

320 

321class MetadorGroup(MetadorNode): 

322 """Wrapper for a HDF5 Group.""" 

323 

324 __wrapped__: H5GroupLike 

325 

326 def _destroy_meta(self, _unlink: bool = True): 

327 """Destroy all attached metadata at and below this node (recursively).""" 

328 super()._destroy_meta(_unlink=_unlink) # this node 

329 for child in self.values(): # recurse 

330 child._destroy_meta(_unlink=_unlink) 

331 

332 # these access entities in read-only way: 

333 

334 get = _wrap_method("get", is_read_only_method=True) 

335 __getitem__ = _wrap_method("__getitem__", is_read_only_method=True) 

336 

337 # these just create new entities with no metadata attached: 

338 

339 create_group = _wrap_method("create_group") 

340 require_group = _wrap_method("require_group") 

341 create_dataset = _wrap_method("create_dataset") 

342 require_dataset = _wrap_method("require_dataset") 

343 

344 def __setitem__(self, name, value): 

345 if any(map(lambda x: isinstance(value, x), _H5_REF_TYPES)): 

346 raise ValueError(f"Unsupported reference type: {type(value).__name__}") 

347 

348 return _wrap_method("__setitem__")(self, name, value) 

349 

350 # following all must be filtered to hide metador-specific structures: 

351 

352 # must wrap nodes passed into the callback function and filter visited names 

353 def visititems(self, func): 

354 def wrapped_func(name, node): 

355 if M.is_internal_path(node.name): 

356 return # skip path/node 

357 return func(name, self._wrap_if_node(node)) 

358 

359 return self.__wrapped__.visititems(wrapped_func) # RAW 

360 

361 # paths passed to visit also must be filtered, so must override this one too 

362 def visit(self, func): 

363 def wrapped_func(name, _): 

364 return func(name) 

365 

366 return self.visititems(wrapped_func) 

367 

368 # following also depend on the filtered sequence, directly 

369 # filter the items, derive other related functions based on that 

370 

371 def items(self): 

372 for k, v in self.__wrapped__.items(): 

373 if v is None: 

374 # NOTE: e.g. when nodes are deleted/moved during iteration, 

375 # v can suddenly be None -> we need to catch this case! 

376 continue 

377 if not M.is_internal_path(v.name): 

378 yield (k, self._wrap_if_node(v)) 

379 

380 def values(self): 

381 return map(lambda x: x[1], self.items()) 

382 

383 def keys(self): 

384 return map(lambda x: x[0], self.items()) 

385 

386 def __iter__(self): 

387 return iter(self.keys()) 

388 

389 def __len__(self): 

390 return len(list(self.keys())) 

391 

392 def __contains__(self, name: str): 

393 self._guard_path(name) 

394 if name[0] == "/" and self.name != "/": 

395 return name in self["/"] 

396 segs = name.lstrip("/").split("/") 

397 has_first_seg = segs[0] in self.keys() 

398 if len(segs) == 1: 

399 return has_first_seg 

400 else: 

401 if nxt := self.get(segs[0]): 

402 return "/".join(segs[1:]) in nxt 

403 return False 

404 

405 # these we can take care of but are a bit more tricky to think through 

406 

407 def __delitem__(self, name: str): 

408 self._guard_acl(NodeAcl.read_only, "__delitem__") 

409 self._guard_path(name) 

410 

411 node = self[name] 

412 # clean up metadata (recursively, if a group) 

413 node._destroy_meta() 

414 # kill the actual data 

415 return _wrap_method("__delitem__")(self, name) 

416 

417 def move(self, source: str, dest: str): 

418 self._guard_acl(NodeAcl.read_only, "move") 

419 self._guard_path(source) 

420 self._guard_path(dest) 

421 

422 src_metadir = self[source].meta._base_dir 

423 # if actual data move fails, an exception will prevent the rest 

424 self.__wrapped__.move(source, dest) # RAW 

425 

426 # if we're here, no problems -> proceed with moving metadata 

427 dst_node = self[dest] 

428 if isinstance(dst_node, MetadorDataset): 

429 dst_metadir = dst_node.meta._base_dir 

430 # dataset has its metadata stored in parallel -> need to take care of it 

431 meta_base = dst_metadir 

432 if src_metadir in self.__wrapped__: # RAW 

433 self.__wrapped__.move(src_metadir, dst_metadir) # RAW 

434 else: 

435 # directory where to fix up metadata object TOC links 

436 # when a group was moved, all metadata is contained in dest -> search it 

437 meta_base = dst_node.name 

438 

439 # re-link metadata object TOC links 

440 if meta_base_node := self.__wrapped__.get(meta_base): 

441 assert isinstance(meta_base_node, H5GroupLike) 

442 missing = self._self_container.metador._links.find_missing(meta_base_node) 

443 self._self_container.metador._links.repair_missing(missing, update=True) 

444 

445 def copy( 

446 self, 

447 source: Union[str, MetadorGroup, MetadorDataset], 

448 dest: Union[str, MetadorGroup], 

449 **kwargs, 

450 ): 

451 self._guard_acl(NodeAcl.read_only, "copy") 

452 

453 # get source node and its name without the path and its type 

454 src_node: MetadorNode 

455 if isinstance(source, str): 

456 self._guard_path(source) 

457 src_node = self[source] 

458 elif isinstance(source, MetadorNode): 

459 src_node = source 

460 else: 

461 raise ValueError("Copy source must be path, Group or Dataset!") 

462 src_is_dataset: bool = isinstance(src_node, MetadorDataset) 

463 src_name: str = src_node.name.split("/")[-1] 

464 # user can override name at target 

465 dst_name: str = kwargs.pop("name", src_name) 

466 

467 # fix up target path 

468 dst_path: str 

469 if isinstance(dest, str): 

470 self._guard_path(dest) 

471 dst_path = dest 

472 elif isinstance(dest, MetadorGroup): 

473 dst_path = dest.name + f"/{dst_name}" 

474 else: 

475 raise ValueError("Copy dest must be path or Group!") 

476 

477 # get other allowed options 

478 without_attrs: bool = kwargs.pop("without_attrs", False) 

479 without_meta: bool = kwargs.pop("without_meta", False) 

480 if kwargs: 

481 raise ValueError(f"Unknown keyword arguments: {kwargs}") 

482 

483 # perform copy 

484 copy_kwargs = { 

485 "name": None, 

486 "shallow": False, 

487 "expand_soft": True, 

488 "expand_external": True, 

489 "expand_refs": True, 

490 "without_attrs": without_attrs, 

491 } 

492 self.__wrapped__.copy(source, dst_path, **copy_kwargs) # RAW 

493 dst_node = self[dst_path] # exists now 

494 

495 if src_is_dataset and not without_meta: 

496 # because metadata lives in parallel group, need to copy separately: 

497 src_meta: str = src_node.meta._base_dir 

498 dst_meta: str = dst_node.meta._base_dir # node will not exist yet 

499 self.__wrapped__.copy(src_meta, dst_meta, **copy_kwargs) # RAW 

500 

501 # register in TOC: 

502 dst_meta_node = self.__wrapped__[dst_meta] 

503 assert isinstance(dst_meta_node, H5GroupLike) 

504 missing = self._self_container.metador._links.find_missing(dst_meta_node) 

505 self._self_container.metador._links.repair_missing(missing) 

506 

507 if not src_is_dataset: 

508 if without_meta: 

509 # need to destroy copied metadata copied with the source group 

510 # but keep TOC links (they point to original copy!) 

511 dst_node._destroy_meta(_unlink=False) 

512 else: 

513 # register copied metadata objects under new uuids 

514 missing = self._self_container.metador._links.find_missing(dst_node) 

515 self._self_container.metador._links.repair_missing(missing) 

516 

517 def __getattr__(self, key): 

518 if hasattr(self.__wrapped__, key): 

519 raise UnsupportedOperationError(key) # deliberately unsupported 

520 else: 

521 msg = f"'{type(self).__name__}' object has no attribute '{key}'" 

522 raise AttributeError(msg) 

523 

524 

525# ---- 

526 

527 

528class MetadorContainer(MetadorGroup): 

529 """Wrapper class adding Metador container interface to h5py.File-like objects. 

530 

531 The wrapper ensures that any actions done to IH5Records through this interface 

532 also work with plain h5py.Files. 

533 

534 There are no guarantees about behaviour with h5py methods not supported by IH5Records. 

535 

536 Given `old: MetadorContainer`, `MetadorContainer(old.data_source, driver=old.data_driver)` 

537 should be able to construct another object to access the same data (assuming it is not locked). 

538 """ 

539 

540 __wrapped__: H5FileLike 

541 

542 _self_SUPPORTED: Set[str] = {"mode", "flush", "close"} 

543 

544 # ---- new container-level interface ---- 

545 

546 _self_toc: MetadorContainerTOC 

547 

548 @property 

549 def metador(self) -> MetadorContainerTOC: 

550 """Access interface to Metador metadata object index.""" 

551 return self._self_toc 

552 

553 def __init__( 

554 self, 

555 name_or_obj: Union[MetadorDriver, Any], 

556 mode: Optional[OpenMode] = "r", 

557 *, 

558 # NOTE: driver takes class instead of enum to also allow subclasses 

559 driver: Optional[Type[MetadorDriver]] = None, 

560 ): 

561 """Initialize a MetadorContainer instance from file(s) or a supported object. 

562 

563 The `mode` argument is ignored when simply wrapping an object. 

564 

565 If a data source such as a path is passed, will instantiate the object first, 

566 using the default H5File driver or the passed `driver` keyword argument. 

567 """ 

568 # wrap the h5file-like object (will set self.__wrapped__) 

569 super().__init__(self, to_h5filelike(name_or_obj, mode, driver=driver)) 

570 # initialize metador-specific stuff 

571 self._self_toc = MetadorContainerTOC(self) 

572 

573 # not clear if we want these in the public interface. keep this private for now: 

574 

575 # def _find_orphan_meta(self) -> List[str]: 

576 # """Return list of paths to metadata that has no corresponding user node anymore.""" 

577 # ret: List[str] = [] 

578 

579 # def collect_orphans(name: str): 

580 # if M.is_meta_base_path(name): 

581 # if M.to_data_node_path(name) not in self: 

582 # ret.append(name) 

583 

584 # self.__wrapped__.visit(collect_orphans) 

585 # return ret 

586 

587 # def _repair(self, remove_orphans: bool = False): 

588 # """Repair container structure on best-effort basis. 

589 

590 # This will ensure that the TOC points to existing metadata objects 

591 # and that all metadata objects are listed in the TOC. 

592 

593 # If remove_orphans is set, will erase metadata not belonging to an existing node. 

594 

595 # Notice that missing schema plugin dependency metadata cannot be restored. 

596 # """ 

597 # if remove_orphans: 

598 # for path in self._find_orphan_meta(): 

599 # del self.__wrapped__[path] 

600 # self.toc._links.find_broken(repair=True) 

601 # missing = self.toc._links._find_missing("/") 

602 # self.toc._links.repair_missing(missing) 

603 

604 # ---- pass through HDF5 group methods to a wrapped root group instance ---- 

605 

606 def __getattr__(self, key: str): 

607 if key in self._self_SUPPORTED: 

608 return getattr(self.__wrapped__, key) 

609 return super().__getattr__(key) # ask group for method 

610 

611 # context manager: return the wrapper back, not the raw thing: 

612 

613 def __enter__(self): 

614 self.__wrapped__.__enter__() 

615 return self 

616 

617 def __exit__(self, *args): 

618 return self.__wrapped__.__exit__(*args) 

619 

620 # we want these also to be forwarded to the wrapped group, not the raw object: 

621 

622 def __dir__(self): 

623 return list(set(super().__dir__()).union(type(self).__dict__.keys())) 

624 

625 # make wrapper transparent: 

626 

627 def __repr__(self) -> str: 

628 return repr(self.__wrapped__) # shows that its a File, not just a Group