Coverage for src/metador_core/container/wrappers.py: 91%
288 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
1from __future__ import annotations
3from itertools import takewhile
4from typing import Any, Dict, MutableMapping, Optional, Set, Type, Union
6import h5py
7import wrapt
9from ..util.types import H5DatasetLike, H5FileLike, H5GroupLike, H5NodeLike, OpenMode
10from . import utils as M
11from .drivers import MetadorDriver, to_h5filelike
12from .interface import MetadorContainerTOC, MetadorMeta, NodeAcl, NodeAclFlags
15class UnsupportedOperationError(AttributeError):
16 """Subclass to distinguish between actually missing attribute and unsupported one."""
19class WrappedAttributeManager(wrapt.ObjectProxy):
20 """Wrapper for AttributeManager-like objects to prevent mutation (read-only) or inspection (skel-only)."""
22 __wrapped__: MutableMapping
24 _self_acl: NodeAclFlags
25 _self_acl_whitelist: Dict[NodeAcl, Set[str]] = {
26 NodeAcl.skel_only: {"keys"},
27 NodeAcl.read_only: {"keys", "values", "items", "get"},
28 }
29 _self_allowed: Set[str]
31 def __init__(self, obj, acl: NodeAclFlags):
32 super().__init__(obj)
33 # initialize whitelist based on passed ACL flags
34 self._self_acl = acl
35 allowed: Optional[Set[str]] = None
36 for flag, value in acl.items():
37 if not value or flag not in self._self_acl_whitelist:
38 continue
39 if allowed is None:
40 allowed = self._self_acl_whitelist[flag]
41 else:
42 allowed = allowed.intersection(self._self_acl_whitelist[flag])
43 self._self_allowed = allowed or set()
45 def _raise_illegal_op(self, flag_info: str):
46 raise UnsupportedOperationError(
47 f"This attribute set belongs to a node marked as {flag_info}!"
48 )
50 def __getattr__(self, key: str):
51 # NOTE: this will not restrict __contains__ because its a special method
52 # (which is desired behavior).
53 if (
54 hasattr(self.__wrapped__, key)
55 and self._self_allowed
56 and key not in self._self_allowed
57 ):
58 self._raise_illegal_op(str(self._self_acl))
59 return getattr(self.__wrapped__, key)
61 # this is not intercepted by getattr
62 def __getitem__(self, key: str):
63 if self._self_acl[NodeAcl.skel_only]:
64 self._raise_illegal_op(NodeAcl.skel_only.name)
65 return self.__wrapped__.__getitem__(key)
67 # this is not intercepted by getattr
68 def __setitem__(self, key: str, value):
69 if self._self_acl[NodeAcl.read_only]:
70 self._raise_illegal_op(NodeAcl.read_only.name)
71 return self.__wrapped__.__setitem__(key, value)
73 # this is not intercepted by getattr
74 def __delitem__(self, key: str):
75 if self._self_acl[NodeAcl.read_only]:
76 self._raise_illegal_op(NodeAcl.read_only.name)
77 return self.__wrapped__.__delitem__(key)
79 def __repr__(self) -> str:
80 return repr(self.__wrapped__)
83class WithDefaultQueryStartNode(wrapt.ObjectProxy):
84 """Cosmetic wrapper to search metadata below a H5LikeGroup.
86 Used to make sure that:
87 `group.metador.query(...)`
88 is equivalent to:
89 `container.metador.query(..., node=group)`
91 (without it, the default node will be the root "/" if not specified).
92 """
94 __wrapped__: MetadorContainerTOC
96 def __init__(self, obj: MetadorContainerTOC, def_start_node):
97 super().__init__(obj)
98 self._self_query_start_node = def_start_node
100 def query(self, schema, version=None, *, node=None):
101 node = node or self._self_query_start_node
102 return self.__wrapped__.query(schema, version, node=node)
105class MetadorNode(wrapt.ObjectProxy):
106 """Wrapper for h5py and IH5 Groups and Datasets providing Metador-specific features.
108 In addition to the Metadata management, also provides helpers to reduce possible
109 mistakes in implementing interfaces by allowing to mark nodes as
111 * read_only (regardless of the writability of the underlying opened container) and
112 * local_only (preventing access to (meta)data above this node)
114 Note that these are "soft" restrictions to prevent errors and can be bypassed.
115 """
117 __wrapped__: H5NodeLike
119 @staticmethod
120 def _parse_access_flags(kwargs) -> NodeAclFlags:
121 # NOTE: mutating kwargs, removes keys that are inspected!
122 return {flag: kwargs.pop(flag.name, False) for flag in iter(NodeAcl)}
124 def __init__(self, mc: MetadorContainer, node: H5NodeLike, **kwargs):
125 flags = self._parse_access_flags(kwargs)
126 lp = kwargs.pop("local_parent", None)
127 if kwargs:
128 raise ValueError(f"Unknown keyword arguments: {kwargs}")
130 super().__init__(node)
131 self._self_container: MetadorContainer = mc
133 self._self_flags: NodeAclFlags = flags
134 self._self_local_parent: Optional[MetadorGroup] = lp
136 def _child_node_kwargs(self):
137 """Return kwargs to be passed to a child node.
139 Ensures that {read,skel,local}_only status is passed down correctly.
140 """
141 return {
142 "local_parent": self if self.acl[NodeAcl.local_only] else None,
143 **{k.name: v for k, v in self.acl.items() if v},
144 }
146 def restrict(self, **kwargs) -> MetadorNode:
147 """Restrict this object to be local_only or read_only.
149 Pass local_only=True and/or read_only=True to enable the restriction.
151 local_only means that the node may not access the parent or file objects.
152 read_only means that mutable actions cannot be done (even if container is mutable).
153 """
154 added_flags = self._parse_access_flags(kwargs)
155 if added_flags[NodeAcl.local_only]:
156 # was set as local explicitly for this node ->
157 self._self_local_parent = None # remove its ability to go up
159 # can only set, but not unset!
160 self._self_flags.update({k: True for k, v in added_flags.items() if v})
161 if kwargs:
162 raise ValueError(f"Unknown keyword arguments: {kwargs}")
164 return self
166 @property
167 def acl(self) -> Dict[NodeAcl, bool]:
168 """Return ACL flags of current node."""
169 return dict(self._self_flags)
171 def _guard_path(self, path: str):
172 if M.is_internal_path(path):
173 msg = f"Trying to use a Metador-internal path: '{path}'"
174 raise ValueError(msg)
175 if self.acl[NodeAcl.local_only] and path[0] == "/":
176 msg = f"Node is marked as local_only, cannot use absolute path '{path}'!"
177 raise ValueError(msg)
179 def _guard_acl(self, flag: NodeAcl, method: str = "this method"):
180 if self.acl[flag]:
181 msg = f"Cannot use {method}, the node is marked as {flag.name}!"
182 raise UnsupportedOperationError(msg)
184 # helpers
186 def _wrap_if_node(self, val):
187 """Wrap value into a metador node wrapper, if it is a suitable group or dataset."""
188 if isinstance(val, H5GroupLike):
189 return MetadorGroup(self._self_container, val, **self._child_node_kwargs())
190 elif isinstance(val, H5DatasetLike):
191 return MetadorDataset(
192 self._self_container, val, **self._child_node_kwargs()
193 )
194 else:
195 return val
197 def _destroy_meta(self, _unlink: bool = True):
198 """Destroy all attached metadata at and below this node."""
199 self.meta._destroy(_unlink=_unlink)
201 # need that to add our new methods
203 def __dir__(self):
204 names = set.union(
205 *map(
206 lambda x: set(x.__dict__.keys()),
207 takewhile(lambda x: issubclass(x, MetadorNode), type(self).mro()),
208 )
209 )
210 return list(set(super().__dir__()).union(names))
212 # make wrapper transparent
214 def __repr__(self):
215 return repr(self.__wrapped__)
217 # added features
219 @property
220 def meta(self) -> MetadorMeta:
221 """Access the interface to metadata attached to this node."""
222 return MetadorMeta(self)
224 @property
225 def metador(self) -> MetadorContainerTOC:
226 """Access the info about the container this node belongs to."""
227 return WithDefaultQueryStartNode(self._self_container.metador, self)
229 # wrap existing methods as needed
231 @property
232 def name(self) -> str:
233 return self.__wrapped__.name # just for type checker not to complain
235 @property
236 def attrs(self):
237 if self.acl[NodeAcl.read_only] or self.acl[NodeAcl.skel_only]:
238 return WrappedAttributeManager(self.__wrapped__.attrs, self.acl)
239 return self.__wrapped__.attrs
241 @property
242 def parent(self) -> MetadorGroup:
243 if self.acl[NodeAcl.local_only]:
244 # allow child nodes of local-only nodes to go up to the marked parent
245 # (or it is None, if this is the local root)
246 if lp := self._self_local_parent:
247 return lp
248 else:
249 # raise exception (illegal non-local access)
250 self._guard_acl(NodeAcl.local_only, "parent")
252 return MetadorGroup(
253 self._self_container,
254 self.__wrapped__.parent,
255 **self._child_node_kwargs(),
256 )
258 @property
259 def file(self) -> MetadorContainer:
260 if self.acl[NodeAcl.local_only]:
261 # raise exception (illegal non-local access)
262 self._guard_acl(NodeAcl.local_only, "parent")
263 return self._self_container
266class MetadorDataset(MetadorNode):
267 """Metador wrapper for a HDF5 Dataset."""
269 __wrapped__: H5DatasetLike
271 # manually assembled from public methods which h5py.Dataset provides
272 _self_RO_FORBIDDEN = {"resize", "make_scale", "write_direct", "flush"}
274 def __getattr__(self, key):
275 if self.acl[NodeAcl.read_only] and key in self._self_RO_FORBIDDEN:
276 self._guard_acl(NodeAcl.read_only, key)
277 if self.acl[NodeAcl.skel_only] and key == "get":
278 self._guard_acl(NodeAcl.skel_only, key)
280 return getattr(self.__wrapped__, key)
282 # prevent getter of node if marked as skel_only
283 def __getitem__(self, *args, **kwargs):
284 self._guard_acl(NodeAcl.skel_only, "__getitem__")
285 return self.__wrapped__.__getitem__(*args, **kwargs)
287 # prevent mutating method calls of node is marked as read_only
289 def __setitem__(self, *args, **kwargs):
290 self._guard_acl(NodeAcl.read_only, "__setitem__")
291 return self.__wrapped__.__setitem__(*args, **kwargs)
294# TODO: can this be done somehow with wrapt.decorator but still without boilerplate?
295# problem is it wants a function, but we need to look it up by name first
296# so we hand-roll the decorator for now.
297def _wrap_method(method: str, is_read_only_method: bool = False):
298 """Wrap a method called on a HDF5 entity.
300 Prevents user from creating or deleting reserved entities/names by hand.
301 Ensures that a wrapped Group/Dataset is returned.
303 If is_read_only=False and the object is read_only, refuses to call the method.
304 """
306 def wrapped_method(obj, name, *args, **kwargs):
307 # obj will be the self of the wrapper instance
308 obj._guard_path(name)
309 if not is_read_only_method:
310 obj._guard_acl(NodeAcl.read_only, method)
311 ret = getattr(obj.__wrapped__, method)(name, *args, **kwargs) # RAW
312 return obj._wrap_if_node(ret)
314 return wrapped_method
317# classes of h5py reference-like types (we don't support that)
318_H5_REF_TYPES = [h5py.HardLink, h5py.SoftLink, h5py.ExternalLink, h5py.h5r.Reference]
321class MetadorGroup(MetadorNode):
322 """Wrapper for a HDF5 Group."""
324 __wrapped__: H5GroupLike
326 def _destroy_meta(self, _unlink: bool = True):
327 """Destroy all attached metadata at and below this node (recursively)."""
328 super()._destroy_meta(_unlink=_unlink) # this node
329 for child in self.values(): # recurse
330 child._destroy_meta(_unlink=_unlink)
332 # these access entities in read-only way:
334 get = _wrap_method("get", is_read_only_method=True)
335 __getitem__ = _wrap_method("__getitem__", is_read_only_method=True)
337 # these just create new entities with no metadata attached:
339 create_group = _wrap_method("create_group")
340 require_group = _wrap_method("require_group")
341 create_dataset = _wrap_method("create_dataset")
342 require_dataset = _wrap_method("require_dataset")
344 def __setitem__(self, name, value):
345 if any(map(lambda x: isinstance(value, x), _H5_REF_TYPES)):
346 raise ValueError(f"Unsupported reference type: {type(value).__name__}")
348 return _wrap_method("__setitem__")(self, name, value)
350 # following all must be filtered to hide metador-specific structures:
352 # must wrap nodes passed into the callback function and filter visited names
353 def visititems(self, func):
354 def wrapped_func(name, node):
355 if M.is_internal_path(node.name):
356 return # skip path/node
357 return func(name, self._wrap_if_node(node))
359 return self.__wrapped__.visititems(wrapped_func) # RAW
361 # paths passed to visit also must be filtered, so must override this one too
362 def visit(self, func):
363 def wrapped_func(name, _):
364 return func(name)
366 return self.visititems(wrapped_func)
368 # following also depend on the filtered sequence, directly
369 # filter the items, derive other related functions based on that
371 def items(self):
372 for k, v in self.__wrapped__.items():
373 if v is None:
374 # NOTE: e.g. when nodes are deleted/moved during iteration,
375 # v can suddenly be None -> we need to catch this case!
376 continue
377 if not M.is_internal_path(v.name):
378 yield (k, self._wrap_if_node(v))
380 def values(self):
381 return map(lambda x: x[1], self.items())
383 def keys(self):
384 return map(lambda x: x[0], self.items())
386 def __iter__(self):
387 return iter(self.keys())
389 def __len__(self):
390 return len(list(self.keys()))
392 def __contains__(self, name: str):
393 self._guard_path(name)
394 if name[0] == "/" and self.name != "/":
395 return name in self["/"]
396 segs = name.lstrip("/").split("/")
397 has_first_seg = segs[0] in self.keys()
398 if len(segs) == 1:
399 return has_first_seg
400 else:
401 if nxt := self.get(segs[0]):
402 return "/".join(segs[1:]) in nxt
403 return False
405 # these we can take care of but are a bit more tricky to think through
407 def __delitem__(self, name: str):
408 self._guard_acl(NodeAcl.read_only, "__delitem__")
409 self._guard_path(name)
411 node = self[name]
412 # clean up metadata (recursively, if a group)
413 node._destroy_meta()
414 # kill the actual data
415 return _wrap_method("__delitem__")(self, name)
417 def move(self, source: str, dest: str):
418 self._guard_acl(NodeAcl.read_only, "move")
419 self._guard_path(source)
420 self._guard_path(dest)
422 src_metadir = self[source].meta._base_dir
423 # if actual data move fails, an exception will prevent the rest
424 self.__wrapped__.move(source, dest) # RAW
426 # if we're here, no problems -> proceed with moving metadata
427 dst_node = self[dest]
428 if isinstance(dst_node, MetadorDataset):
429 dst_metadir = dst_node.meta._base_dir
430 # dataset has its metadata stored in parallel -> need to take care of it
431 meta_base = dst_metadir
432 if src_metadir in self.__wrapped__: # RAW
433 self.__wrapped__.move(src_metadir, dst_metadir) # RAW
434 else:
435 # directory where to fix up metadata object TOC links
436 # when a group was moved, all metadata is contained in dest -> search it
437 meta_base = dst_node.name
439 # re-link metadata object TOC links
440 if meta_base_node := self.__wrapped__.get(meta_base):
441 assert isinstance(meta_base_node, H5GroupLike)
442 missing = self._self_container.metador._links.find_missing(meta_base_node)
443 self._self_container.metador._links.repair_missing(missing, update=True)
445 def copy(
446 self,
447 source: Union[str, MetadorGroup, MetadorDataset],
448 dest: Union[str, MetadorGroup],
449 **kwargs,
450 ):
451 self._guard_acl(NodeAcl.read_only, "copy")
453 # get source node and its name without the path and its type
454 src_node: MetadorNode
455 if isinstance(source, str):
456 self._guard_path(source)
457 src_node = self[source]
458 elif isinstance(source, MetadorNode):
459 src_node = source
460 else:
461 raise ValueError("Copy source must be path, Group or Dataset!")
462 src_is_dataset: bool = isinstance(src_node, MetadorDataset)
463 src_name: str = src_node.name.split("/")[-1]
464 # user can override name at target
465 dst_name: str = kwargs.pop("name", src_name)
467 # fix up target path
468 dst_path: str
469 if isinstance(dest, str):
470 self._guard_path(dest)
471 dst_path = dest
472 elif isinstance(dest, MetadorGroup):
473 dst_path = dest.name + f"/{dst_name}"
474 else:
475 raise ValueError("Copy dest must be path or Group!")
477 # get other allowed options
478 without_attrs: bool = kwargs.pop("without_attrs", False)
479 without_meta: bool = kwargs.pop("without_meta", False)
480 if kwargs:
481 raise ValueError(f"Unknown keyword arguments: {kwargs}")
483 # perform copy
484 copy_kwargs = {
485 "name": None,
486 "shallow": False,
487 "expand_soft": True,
488 "expand_external": True,
489 "expand_refs": True,
490 "without_attrs": without_attrs,
491 }
492 self.__wrapped__.copy(source, dst_path, **copy_kwargs) # RAW
493 dst_node = self[dst_path] # exists now
495 if src_is_dataset and not without_meta:
496 # because metadata lives in parallel group, need to copy separately:
497 src_meta: str = src_node.meta._base_dir
498 dst_meta: str = dst_node.meta._base_dir # node will not exist yet
499 self.__wrapped__.copy(src_meta, dst_meta, **copy_kwargs) # RAW
501 # register in TOC:
502 dst_meta_node = self.__wrapped__[dst_meta]
503 assert isinstance(dst_meta_node, H5GroupLike)
504 missing = self._self_container.metador._links.find_missing(dst_meta_node)
505 self._self_container.metador._links.repair_missing(missing)
507 if not src_is_dataset:
508 if without_meta:
509 # need to destroy copied metadata copied with the source group
510 # but keep TOC links (they point to original copy!)
511 dst_node._destroy_meta(_unlink=False)
512 else:
513 # register copied metadata objects under new uuids
514 missing = self._self_container.metador._links.find_missing(dst_node)
515 self._self_container.metador._links.repair_missing(missing)
517 def __getattr__(self, key):
518 if hasattr(self.__wrapped__, key):
519 raise UnsupportedOperationError(key) # deliberately unsupported
520 else:
521 msg = f"'{type(self).__name__}' object has no attribute '{key}'"
522 raise AttributeError(msg)
525# ----
528class MetadorContainer(MetadorGroup):
529 """Wrapper class adding Metador container interface to h5py.File-like objects.
531 The wrapper ensures that any actions done to IH5Records through this interface
532 also work with plain h5py.Files.
534 There are no guarantees about behaviour with h5py methods not supported by IH5Records.
536 Given `old: MetadorContainer`, `MetadorContainer(old.data_source, driver=old.data_driver)`
537 should be able to construct another object to access the same data (assuming it is not locked).
538 """
540 __wrapped__: H5FileLike
542 _self_SUPPORTED: Set[str] = {"mode", "flush", "close"}
544 # ---- new container-level interface ----
546 _self_toc: MetadorContainerTOC
548 @property
549 def metador(self) -> MetadorContainerTOC:
550 """Access interface to Metador metadata object index."""
551 return self._self_toc
553 def __init__(
554 self,
555 name_or_obj: Union[MetadorDriver, Any],
556 mode: Optional[OpenMode] = "r",
557 *,
558 # NOTE: driver takes class instead of enum to also allow subclasses
559 driver: Optional[Type[MetadorDriver]] = None,
560 ):
561 """Initialize a MetadorContainer instance from file(s) or a supported object.
563 The `mode` argument is ignored when simply wrapping an object.
565 If a data source such as a path is passed, will instantiate the object first,
566 using the default H5File driver or the passed `driver` keyword argument.
567 """
568 # wrap the h5file-like object (will set self.__wrapped__)
569 super().__init__(self, to_h5filelike(name_or_obj, mode, driver=driver))
570 # initialize metador-specific stuff
571 self._self_toc = MetadorContainerTOC(self)
573 # not clear if we want these in the public interface. keep this private for now:
575 # def _find_orphan_meta(self) -> List[str]:
576 # """Return list of paths to metadata that has no corresponding user node anymore."""
577 # ret: List[str] = []
579 # def collect_orphans(name: str):
580 # if M.is_meta_base_path(name):
581 # if M.to_data_node_path(name) not in self:
582 # ret.append(name)
584 # self.__wrapped__.visit(collect_orphans)
585 # return ret
587 # def _repair(self, remove_orphans: bool = False):
588 # """Repair container structure on best-effort basis.
590 # This will ensure that the TOC points to existing metadata objects
591 # and that all metadata objects are listed in the TOC.
593 # If remove_orphans is set, will erase metadata not belonging to an existing node.
595 # Notice that missing schema plugin dependency metadata cannot be restored.
596 # """
597 # if remove_orphans:
598 # for path in self._find_orphan_meta():
599 # del self.__wrapped__[path]
600 # self.toc._links.find_broken(repair=True)
601 # missing = self.toc._links._find_missing("/")
602 # self.toc._links.repair_missing(missing)
604 # ---- pass through HDF5 group methods to a wrapped root group instance ----
606 def __getattr__(self, key: str):
607 if key in self._self_SUPPORTED:
608 return getattr(self.__wrapped__, key)
609 return super().__getattr__(key) # ask group for method
611 # context manager: return the wrapper back, not the raw thing:
613 def __enter__(self):
614 self.__wrapped__.__enter__()
615 return self
617 def __exit__(self, *args):
618 return self.__wrapped__.__exit__(*args)
620 # we want these also to be forwarded to the wrapped group, not the raw object:
622 def __dir__(self):
623 return list(set(super().__dir__()).union(type(self).__dict__.keys()))
625 # make wrapper transparent:
627 def __repr__(self) -> str:
628 return repr(self.__wrapped__) # shows that its a File, not just a Group