Coverage for src/metador_core/container/interface.py: 88%
511 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
1from __future__ import annotations
3import json
4from dataclasses import dataclass
5from enum import Enum, auto
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Dict,
10 ItemsView,
11 Iterator,
12 KeysView,
13 List,
14 Optional,
15 Set,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20 ValuesView,
21 cast,
22 overload,
23)
24from uuid import UUID, uuid1
26from typing_extensions import TypeAlias
28from ..plugin.types import EPName, from_ep_name, plugin_args, to_ep_name
29from ..plugins import schemas
30from ..schema import MetadataSchema
31from ..schema.plugins import PluginPkgMeta, PluginRef
32from ..schema.types import SemVerTuple
33from ..util.types import H5DatasetLike, H5FileLike, H5GroupLike
34from . import utils as M
35from .drivers import (
36 METADOR_DRIVERS,
37 MetadorDriver,
38 MetadorDriverEnum,
39 get_driver_type,
40 get_source,
41)
43if TYPE_CHECKING:
44 from .wrappers import MetadorContainer, MetadorNode
46S = TypeVar("S", bound=MetadataSchema)
49class NodeAcl(Enum):
50 """Metador node soft access control flags.
52 Soft means - they can be bypassed, it is about trying to prevent errors.
54 Group nodes inherit their ACL flags to child nodes.
55 """
57 # NOTE: maybe refactor this to IntFlag? Then e.g. restrict() interface can be like:
58 # node.restrict(acl=NodeAcl.read_only | NodeAcl.local_only)
60 read_only = auto()
61 """Forbid calling methods mutating contents of (meta)data."""
63 local_only = auto()
64 """Forbid access to parents beyond the initial local node."""
66 skel_only = auto()
67 """Forbid reading datasets and metadata, only existence can be checked."""
70NodeAclFlags: TypeAlias = Dict[NodeAcl, bool]
73@dataclass
74class StoredMetadata:
75 """Information about a metadata schema instance stored at a node."""
77 uuid: UUID
78 """UUID identifying the metadata object in the container.
80 Used for bookkeeping, i.e. keeping the container TOC in sync.
81 """
83 schema: PluginRef
84 """Schema the object is an instance of."""
86 node: H5DatasetLike
87 """Node with serialized metadata object."""
89 def to_path(self):
90 """Return path of metadata object.
92 (E.g. to return canonical path for copying TOC link nodes)
93 """
94 prefix = self.node.parent.name
95 ep_name = to_ep_name(self.schema.name, self.schema.version)
96 return f"{prefix}/{ep_name}={self.uuid}"
98 @staticmethod
99 def from_node(obj: H5DatasetLike) -> StoredMetadata:
100 """Instantiate info about a stored metadata node."""
101 path = obj.name
102 segs = path.lstrip("/").split("/")
103 ep_name, uuid_str = segs.pop().split("=")
104 s_name, s_vers = from_ep_name(EPName(ep_name))
105 uuid = UUID(uuid_str)
106 s_ref = schemas.PluginRef(name=s_name, version=s_vers)
107 return StoredMetadata(uuid=uuid, schema=s_ref, node=obj)
110def _schema_ref_for(ep_name: str) -> PluginRef:
111 s_name, s_ver = from_ep_name(EPName(ep_name))
112 return schemas.PluginRef(name=s_name, version=s_ver)
115def _ep_name_for(s_ref: PluginRef) -> str:
116 return to_ep_name(s_ref.name, s_ref.version)
119class MetadorMeta:
120 """Interface to Metador metadata objects stored at a single HDF5 node."""
122 # helpers for __getitem__ and __setitem__
124 @staticmethod
125 def _require_schema(
126 schema_name: str, schema_ver: Optional[SemVerTuple]
127 ) -> Type[MetadataSchema]:
128 """Return compatible installed schema class, if possible.
130 Raises KeyError if no suitable schema was found.
132 Raises TypeError if an auxiliary schema is requested.
133 """
134 schema_class = schemas._get_unsafe(
135 schema_name, schema_ver
136 ) # can raise KeyError
137 if schema_class.Plugin.auxiliary: # reject auxiliary schemas in container
138 msg = f"Cannot attach instances of auxiliary schema '{schema_name}' to a node!"
139 raise TypeError(msg)
140 return schema_class
142 @staticmethod
143 def _parse_obj(
144 schema: Type[S], obj: Union[str, bytes, Dict[str, Any], MetadataSchema]
145 ) -> S:
146 """Return original object if it is an instance of passed schema, or else parse it.
148 Raises ValidationError if parsing fails.
149 """
150 if isinstance(obj, schema):
151 return obj # skip validation, already correct model!
152 # try to convert/parse it:
153 if isinstance(obj, (str, bytes)):
154 return schema.parse_raw(obj)
155 if isinstance(obj, MetadataSchema):
156 return schema.parse_obj(obj.dict())
157 else: # dict
158 return schema.parse_obj(obj)
160 # raw getters and setters don't care about the environment,
161 # they work only based on what objects are available and compatible
162 # and do not perform validation etc.
164 def _get_raw(
165 self, schema_name: str, version: Optional[SemVerTuple] = None
166 ) -> Optional[StoredMetadata]:
167 """Return stored metadata for given schema at this node (or None).
169 If a version is passed, the stored version must also be compatible.
170 """
171 # retrieve stored instance (if suitable)
172 ret: Optional[StoredMetadata] = self._objs.get(schema_name)
173 if not version:
174 return ret # no specified version -> anything goes
175 # otherwise: only return if it is compatible
176 req_ref: Optional[PluginRef] = None
177 req_ref = schemas.PluginRef(name=schema_name, version=version)
178 return ret if ret and req_ref.supports(ret.schema) else None
180 def _set_raw(self, schema_ref: PluginRef, obj: MetadataSchema) -> None:
181 """Store metadata object as instance of passed schema at this node."""
182 # reserve UUID, construct dataset path and store metadata object
183 obj_uuid = self._mc.metador._links.fresh_uuid()
184 obj_path = f"{self._base_dir}/{_ep_name_for(schema_ref)}={str(obj_uuid)}"
185 # store object
186 self._mc.__wrapped__[obj_path] = bytes(obj)
187 obj_node = self._mc.__wrapped__[obj_path]
188 assert isinstance(obj_node, H5DatasetLike)
189 stored_obj = StoredMetadata(uuid=obj_uuid, schema=schema_ref, node=obj_node)
190 self._objs[schema_ref] = stored_obj
191 # update TOC
192 self._mc.metador._links.register(stored_obj)
193 return
195 def _del_raw(self, schema_name: str, *, _unlink: bool = True) -> None:
196 """Delete stored metadata for given schema at this node."""
197 # NOTE: _unlink is only for the destroy method
198 stored_obj = self._objs[schema_name]
199 # unregister in TOC (will also trigger clean up there)
200 if _unlink:
201 self._mc.metador._links.unregister(stored_obj.uuid)
202 # remove metadata object
203 del self._objs[stored_obj.schema.name]
204 del self._mc.__wrapped__[stored_obj.node.name]
205 # no metadata objects left -> remove metadata dir
206 if not self._objs:
207 del self._mc.__wrapped__[self._base_dir]
208 return
210 # helpers for container-level opertions (move, copy, delete etc)
212 def _destroy(self, *, _unlink: bool = True):
213 """Unregister and delete all metadata objects attached to this node."""
214 # NOTE: _unlink is only set to false for node copy without metadata
215 for schema_name in list(self.keys()):
216 self._del_raw(schema_name, _unlink=_unlink)
218 # ----
220 def __init__(self, node: MetadorNode):
221 self._mc: MetadorContainer = node._self_container
222 """Underlying container (for convenience)."""
224 self._node: MetadorNode = node
225 """Underlying actual user node."""
227 is_dataset = isinstance(node, H5DatasetLike)
228 self._base_dir: str = M.to_meta_base_path(node.name, is_dataset)
229 """Path of this metador metadata group node.
231 Actual node exists iff any metadata is stored for the node.
232 """
234 self._objs: Dict[str, StoredMetadata] = {}
235 """Information about available metadata objects."""
237 # load available object metadata encoded in the node names
238 meta_grp = cast(H5GroupLike, self._mc.__wrapped__.get(self._base_dir, {}))
239 for obj_node in meta_grp.values():
240 assert isinstance(obj_node, H5DatasetLike)
241 obj = StoredMetadata.from_node(obj_node)
242 self._objs[obj.schema.name] = obj
244 # ----
246 def keys(self) -> KeysView[str]:
247 """Return names of explicitly attached metadata objects.
249 Transitive parent schemas are not included.
250 """
251 return self._objs.keys()
253 def values(self) -> ValuesView[StoredMetadata]:
254 self._node._guard_acl(NodeAcl.skel_only)
255 return self._objs.values()
257 def items(self) -> ItemsView[str, StoredMetadata]:
258 self._node._guard_acl(NodeAcl.skel_only)
259 return self._objs.items()
261 # ----
263 def __len__(self) -> int:
264 """Return number of explicitly attached metadata objects.
266 Transitive parent schemas are not counted.
267 """
268 return len(self.keys())
270 def __iter__(self) -> Iterator[str]:
271 """Iterate listing schema names of all actually attached metadata objects.
273 Transitive parent schemas are not included.
274 """
275 return iter(self.keys())
277 # ----
279 def query(
280 self,
281 schema: Union[
282 str, Tuple[str, Optional[SemVerTuple]], PluginRef, Type[MetadataSchema]
283 ] = "",
284 version: Optional[SemVerTuple] = None,
285 ) -> Iterator[PluginRef]:
286 """Return schema names for which objects at this node are compatible with passed schema.
288 Will also consider compatible child schema instances.
290 Returned iterator will yield passed schema first, if an object is available.
291 Apart from this, the order is not specified.
292 """
293 schema_name, schema_ver = plugin_args(schema, version)
294 # no schema selected -> list everything
295 if not schema_name:
296 for obj in self.values():
297 yield obj.schema
298 return
300 # try exact schema (in any compatible version, if version specified)
301 if obj := self._get_raw(schema_name, schema_ver):
302 yield obj.schema
304 # next, try compatible child schemas of compatible versions of requested schema
305 compat = set().union(
306 *(
307 self._mc.metador.schemas.children(ref)
308 for ref in self._mc.metador.schemas.versions(schema_name, schema_ver)
309 )
310 )
311 avail = {self._get_raw(s).schema for s in self.keys()}
312 for s_ref in avail.intersection(compat):
313 yield s_ref
315 def __contains__(
316 self,
317 schema: Union[
318 str, Tuple[str, Optional[SemVerTuple]], PluginRef, Type[MetadataSchema]
319 ],
320 ) -> bool:
321 """Check whether a compatible metadata object for given schema exists.
323 Will also consider compatible child schema instances.
324 """
325 if schema == "" or isinstance(schema, tuple) and schema[0] == "":
326 return False # empty query lists everything, here the logic is inverted!
327 return next(self.query(schema), None) is not None
329 @overload
330 def __getitem__(self, schema: str) -> MetadataSchema:
331 ...
333 @overload
334 def __getitem__(self, schema: Type[S]) -> S:
335 ...
337 def __getitem__(self, schema: Union[str, Type[S]]) -> Union[S, MetadataSchema]:
338 """Like get, but will raise KeyError on failure."""
339 if ret := self.get(schema):
340 return ret
341 raise KeyError(schema)
343 @overload
344 def get(
345 self, schema: str, version: Optional[SemVerTuple] = None
346 ) -> Optional[MetadataSchema]:
347 ...
349 @overload
350 def get(
351 self, schema: Type[S], version: Optional[SemVerTuple] = None
352 ) -> Optional[S]:
353 ...
355 def get(
356 self, schema: Union[str, Type[S]], version: Optional[SemVerTuple] = None
357 ) -> Optional[Union[MetadataSchema, S]]:
358 """Get a parsed metadata object matching the given schema (if it exists).
360 Will also consider compatible child schema instances.
361 """
362 self._node._guard_acl(NodeAcl.skel_only)
364 # normalize arguments
365 schema_name, schema_ver = plugin_args(schema, version)
367 # get a compatible schema instance that is available at this node
368 compat_schema = next(self.query(schema_name, schema_ver), None)
369 if not compat_schema:
370 return None # not found
372 # get class of schema and parse object
373 schema_class = self._require_schema(schema_name, schema_ver)
374 if obj := self._get_raw(compat_schema.name, compat_schema.version):
375 return cast(S, self._parse_obj(schema_class, obj.node[()]))
376 return None
378 def __setitem__(
379 self, schema: Union[str, Type[S]], value: Union[Dict[str, Any], MetadataSchema]
380 ) -> None:
381 """Store metadata object as instance of given schema.
383 Raises KeyError if passed schema is not installed in environment.
385 Raises TypeError if passed schema is marked auxiliary.
387 Raises ValueError if an object for the schema already exists.
389 Raises ValidationError if passed object is not valid for the schema.
390 """
391 self._node._guard_acl(NodeAcl.read_only)
392 schema_name, schema_ver = plugin_args(schema)
394 # if self.get(schema_name, schema_ver): # <- also subclass schemas
395 # NOTE: for practical reasons let's be more lenient here and allow redundancy
396 # hence only check if exact schema (modulo version) is already there
397 if self._get_raw(schema_name): # <- only same schema
398 msg = f"Metadata object for schema {schema_name} already exists!"
399 raise ValueError(msg)
401 schema_class = self._require_schema(schema_name, schema_ver)
402 checked_obj = self._parse_obj(schema_class, value)
403 self._set_raw(schema_class.Plugin.ref(), checked_obj)
405 def __delitem__(self, schema: Union[str, Type[MetadataSchema]]) -> None:
406 """Delete metadata object explicitly stored for the passed schema.
408 If a schema class is passed, its version is ignored,
409 as each node may contain at most one explicit instance per schema.
411 Raises KeyError if no metadata object for that schema exists.
412 """
413 self._node._guard_acl(NodeAcl.read_only)
414 schema_name, _ = plugin_args(schema)
416 if self._get_raw(schema_name) is None:
417 raise KeyError(schema_name) # no (explicit) metadata object
419 self._del_raw(schema_name)
422# ----
425class TOCLinks:
426 """Link management for synchronizing metadata objects and container TOC."""
428 # NOTE: This is not exposed to the end-user
430 @staticmethod
431 def _link_path_for(schema_ref: PluginRef) -> str:
432 return f"{M.METADOR_LINKS_PATH}/{_ep_name_for(schema_ref)}"
434 def __init__(self, raw_cont: H5FileLike, toc_schemas: TOCSchemas):
435 self._raw: H5FileLike = raw_cont
436 """Raw underlying container (for quick access)."""
438 self._toc_schemas = toc_schemas
439 """Schemas used in container (to (un)register)."""
441 self._toc_path: Dict[UUID, str] = {}
442 """Maps metadata object UUIDs to paths of respective pseudo-symlink in TOC."""
444 # load links into memory
445 if M.METADOR_LINKS_PATH in self._raw:
446 link_grp = self._raw.require_group(M.METADOR_LINKS_PATH)
447 assert isinstance(link_grp, H5GroupLike)
448 for schema_link_grp in link_grp.values():
449 assert isinstance(schema_link_grp, H5GroupLike)
450 for uuid, link_node in schema_link_grp.items():
451 assert isinstance(link_node, H5DatasetLike)
452 self._toc_path[UUID(uuid)] = link_node.name
454 def fresh_uuid(self) -> UUID:
455 """Return a UUID string not used for a metadata object in the container yet."""
456 fresh = False
457 ret: UUID
458 # NOTE: here a very unlikely race condition is present if parallelized
459 while not fresh:
460 ret = uuid1()
461 fresh = ret not in self._toc_path
462 self._toc_path[ret] = None # not assigned yet, but "reserved"
463 # ----
464 return ret
466 def resolve(self, uuid: UUID) -> str:
467 """Get the path a UUID in the TOC points to."""
468 link_path = self._toc_path[uuid]
469 link_node = cast(H5DatasetLike, self._raw[link_path])
470 return link_node[()].decode("utf-8")
472 def update(self, uuid: UUID, new_target: str):
473 """Update target of an existing link to point to a new location."""
474 link_path = self._toc_path[uuid]
475 del self._raw[link_path]
476 self._raw[link_path] = new_target
478 def register(self, obj: StoredMetadata) -> None:
479 """Create a link for a metadata object in container TOC.
481 The link points to the metadata object.
482 """
483 self._toc_schemas._register(obj.schema)
485 toc_path = f"{self._link_path_for(obj.schema)}/{obj.uuid}"
486 self._toc_path[obj.uuid] = toc_path
487 self._raw[toc_path] = str(obj.node.name)
489 def unregister(self, uuid: UUID) -> None:
490 """Unregister metadata object in TOC given its UUID.
492 Will remove the object and clean up empty directories in the TOC.
493 """
494 # delete the link itself and free the UUID
495 toc_path = self._toc_path[uuid]
497 schema_group = self._raw[toc_path].parent
498 assert isinstance(schema_group, H5GroupLike)
499 link_group = schema_group.parent
500 assert link_group.name == M.METADOR_LINKS_PATH
502 del self._raw[toc_path]
503 del self._toc_path[uuid]
504 if len(schema_group):
505 return # schema still has instances
507 s_name_vers: str = schema_group.name.split("/")[-1]
508 # delete empty group for schema
509 del self._raw[schema_group.name]
510 # notify schema manager (cleans up schema + package info)
511 self._toc_schemas._unregister(_schema_ref_for(s_name_vers))
513 if len(link_group.keys()):
514 return # container still has metadata
515 else:
516 # remove the link dir itself (no known metadata in container left)
517 del self._raw[link_group.name]
519 # ----
521 def find_broken(self, repair: bool = False) -> List[UUID]:
522 """Return list of UUIDs in TOC not pointing to an existing metadata object.
524 Will use loaded cache of UUIDs and check them, without scanning the container.
526 If repair is set, will remove those broken links.
527 """
528 broken = []
529 for uuid in self._toc_path.keys():
530 target = self.resolve(uuid)
531 if target not in self._raw:
532 broken.append(uuid)
533 if repair:
534 for uuid in broken:
535 self.unregister(uuid)
536 return broken
538 def find_missing(self, path: H5GroupLike) -> List[H5DatasetLike]:
539 """Return list of metadata objects not listed in TOC."""
540 missing = []
542 def collect_missing(_, node):
543 if not M.is_internal_path(node.name, M.METADOR_META_PREF):
544 return # not a metador metadata path
545 if M.is_meta_base_path(node.name):
546 # top dir, not a "link dataset",
547 # e.g. /.../foo/metador_meta_ or /.../metador_meta_foo
548 return
550 # now we assume we have a path to a metadata link object in the group
551 obj = StoredMetadata.from_node(node)
552 known = obj.uuid in self._toc_path
553 # check UUID collision: i.e., used in TOC, but points elsewhere
554 # (requires fixing up the name of this object / new UUID)
555 # implies that THIS object IS missing in the TOC
556 collision = known and self.resolve(obj.uuid) != node.name
557 if not known or collision:
558 missing.append(node)
560 # ensure its a group and collect
561 self._raw.require_group(path.name).visititems(collect_missing)
562 return missing
564 def repair_missing(
565 self, missing: List[H5DatasetLike], update: bool = False
566 ) -> None:
567 """Repair links (objects get new UUIDs, unless update is true)."""
568 # NOTE: needed for correct copy and move of nodes with their metadata
569 for node in missing:
570 obj = StoredMetadata.from_node(node)
571 if update and obj.uuid in self._toc_path:
572 # update target of existing link (e.g. for move)
573 self.update(obj.uuid, node.name)
574 else:
575 # assign new UUID (e.g. for copy)
576 # (copied metadata node refers to some other uuid in the name)
577 obj.uuid = self.fresh_uuid()
578 new_path = obj.to_path()
579 # rename the metadata node to point to the new UUID
580 self._raw.move(node.name, new_path)
581 obj.node = cast(H5DatasetLike, self._raw[new_path])
582 # register the object with the new UUID in the TOC
583 self.register(obj)
586class TOCSchemas:
587 """Schema management for schemas used in the container.
589 Interface is made to mimic PGSchema wherever it makes sense.
590 """
592 @classmethod
593 def _schema_path_for(cls, s_ref: PluginRef) -> str:
594 return f"{M.METADOR_SCHEMAS_PATH}/{to_ep_name(s_ref.name, s_ref.version)}"
596 @classmethod
597 def _jsonschema_path_for(cls, s_ref: PluginRef) -> str:
598 return f"{cls._schema_path_for(s_ref)}/jsonschema.json"
600 @staticmethod
601 def _load_json(node: H5DatasetLike):
602 return json.loads(node[()].decode("utf-8"))
604 def _update_parents_children(
605 self, schema_ref: PluginRef, parents: Optional[List[PluginRef]]
606 ):
607 if parents is None: # remove schema
608 for parent in self._parents[schema_ref]:
609 if parent in self._schemas:
610 self._children[parent].remove(schema_ref)
611 elif all(
612 (child not in self._schemas for child in self._children[parent])
613 ):
614 del self._parents[parent]
615 del self._children[parent]
616 else: # add schema
617 for i, parent in enumerate(parents):
618 if parent not in self._parents:
619 self._parents[parent] = parents[: i + 1]
620 if parent not in self._children:
621 self._children[parent] = set()
622 if parent != schema_ref:
623 self._children[parent].add(schema_ref)
625 def _register(self, schema_ref: PluginRef):
626 """Notify that a schema is used in the container (metadata object is created/updated).
628 If the schema has not been used before in the container, will store metadata about it.
629 """
630 if schema_ref in self._schemas:
631 return # nothing to do
633 # store json schema
634 schema_cls = schemas.get(schema_ref.name, schema_ref.version)
635 jsonschema_dat = schema_cls.schema_json().encode("utf-8")
636 jsonschema_path = self._jsonschema_path_for(schema_ref)
637 self._raw[jsonschema_path] = jsonschema_dat
639 # store parent schema refs
640 compat_path = f"{self._schema_path_for(schema_ref)}/compat"
641 parents = schemas.parent_path(schema_ref.name, schema_ref.version)
642 parents_dat: bytes = json.dumps(list(map(lambda x: x.dict(), parents))).encode(
643 "utf-8"
644 )
646 self._raw[compat_path] = parents_dat
647 self._schemas.add(schema_ref)
648 self._update_parents_children(schema_ref, parents)
650 # add providing package (if no stored package provides it)
651 if not self._pkgs._providers.get(schema_ref, []):
652 env_pkg_info: PluginPkgMeta = schemas.provider(schema_cls.Plugin.ref())
653 pkg_name_ver = (str(env_pkg_info.name), env_pkg_info.version)
654 self._pkgs._register(pkg_name_ver, env_pkg_info)
655 self._used[pkg_name_ver] = set()
657 # update used schemas tracker for all packages providing this schema
658 for pkg in self._pkgs._providers[schema_ref]:
659 self._used[pkg].add(schema_ref)
661 def _unregister(self, schema_ref: PluginRef):
662 """Notify that a schema is not used at any container node anymore.
664 If after that no schema of a listed dep package is used,
665 this dependency will be removed from the container.
666 """
667 del self._raw[self._schema_path_for(schema_ref)]
668 self._schemas.remove(schema_ref)
669 self._update_parents_children(schema_ref, None)
671 providers = set(self._pkgs._providers[schema_ref])
672 for pkg in providers:
673 pkg_used = self._used[pkg]
674 if schema_ref in pkg_used:
675 # remove schema from list of used schemas of pkg
676 pkg_used.remove(schema_ref)
677 if not len(pkg_used):
678 # package not used anymore in container -> clean up
679 self._pkgs._unregister(pkg)
681 # remove schemas group if it is empty (no schemas used in container)
682 if not self._raw.require_group(M.METADOR_SCHEMAS_PATH).keys():
683 del self._raw[M.METADOR_SCHEMAS_PATH]
685 def __init__(self, raw_cont: H5FileLike, toc_packages: TOCPackages):
686 self._raw: H5FileLike = raw_cont
687 """Raw underlying container (for quick access)."""
689 self._pkgs = toc_packages
690 """TOC package metadata manager object."""
692 self._schemas: Set[PluginRef] = set()
693 """Stored JSON Schemas of actually used schemas."""
695 self._parents: Dict[PluginRef, List[PluginRef]] = {}
696 """Parents of a used json schema (i.e. other partially compatible schemas)."""
698 self._children: Dict[PluginRef, Set[PluginRef]] = {}
699 """Children of a used json schema (i.e. other fully compatible schemas)."""
701 self._used: Dict[PythonDep, Set[PluginRef]] = {}
702 """package name + version -> name of schemas used in container"""
704 for pkg in self._pkgs.keys():
705 self._used[pkg] = set()
707 if M.METADOR_SCHEMAS_PATH in self._raw:
708 schema_grp = self._raw.require_group(M.METADOR_SCHEMAS_PATH)
709 for name, node in schema_grp.items():
710 s_ref: PluginRef = _schema_ref_for(name)
711 assert isinstance(node, H5GroupLike)
712 compat = node["compat"]
713 assert isinstance(compat, H5DatasetLike)
715 reflist = json.loads(compat[()].decode("utf-8"))
716 parents = list(map(PluginRef.parse_obj, reflist))
718 self._schemas.add(s_ref)
719 self._update_parents_children(s_ref, parents)
720 for pkg in self._pkgs._providers[s_ref]:
721 self._used[pkg].add(s_ref)
723 @property
724 def packages(self) -> TOCPackages:
725 """Like PluginGroup.packages, but with respect to schemas used in container."""
726 return self._pkgs
728 def provider(self, schema_ref: PluginRef) -> PluginPkgMeta:
729 """Like PluginGroup.provider, but with respect to container deps."""
730 pkg_name_ver = next(iter(self._pkgs._providers.get(schema_ref, [])), None)
731 if pkg_name_ver is None:
732 msg = f"Did not find metadata of a package providing schema: '{schema_ref}'"
733 raise KeyError(msg)
734 return self._pkgs[pkg_name_ver]
736 def parent_path(
737 self, schema, version: Optional[SemVerTuple] = None
738 ) -> List[PluginRef]:
739 """Like PGSchema.parent_path, but with respect to container deps."""
740 name, vers = plugin_args(schema, version, require_version=True)
741 s_ref = schemas.PluginRef(name=name, version=vers)
742 return self._parents[s_ref]
744 def versions(
745 self, p_name: str, version: Optional[SemVerTuple] = None
746 ) -> List[PluginRef]:
747 """Like PGSchema.versions, but with respect to container deps."""
748 # NOTE: using _children instead of _schemas because some are only listed
749 # due to their appearance in the parent_path of some actually used schema
750 # but we need them here for "parent compatibility" to work right.
751 refs = list(filter(lambda s: s.name == p_name, self._children))
753 if version is None:
754 return refs
755 # filter plugins for compatible version
756 requested = schemas.PluginRef(name=p_name, version=version)
757 # NOTE: here "supports" arguments are reversed (compared to "plugin versions")!
758 # because its about instances (that must be "below" the requested schema version)
759 return [ref for ref in refs if requested.supports(ref)]
761 def children(self, schema, version: Optional[SemVerTuple] = None) -> Set[PluginRef]:
762 """Like PGSchema.children, but with respect to container deps."""
763 name, vers = plugin_args(schema, version)
764 if vers is not None:
765 s_refs = [schemas.PluginRef(name=name, version=vers)]
766 else:
767 # if no version is given, collect all possibilities
768 s_refs = [ref for ref in self._children.keys() if ref.name == name]
769 # return all that can be actually retrieved
770 return set().union(
771 *filter(lambda x: x is not None, map(self._children.get, s_refs))
772 )
774 # ----
776 def __len__(self):
777 return len(self._schemas)
779 def __iter__(self):
780 return iter(self.keys())
782 def __contains__(self, schema_ref: PluginRef):
783 return schema_ref in self._schemas
785 def __getitem__(self, schema_ref: PluginRef):
786 node_path = self._jsonschema_path_for(schema_ref)
787 assert node_path in self._raw
788 return self._load_json(cast(H5DatasetLike, self._raw[node_path]))
790 def get(self, schema_ref: PluginRef):
791 try:
792 self[schema_ref]
793 except KeyError:
794 return None
796 def keys(self):
797 return set(self._schemas)
799 def values(self):
800 return [self[k] for k in self.keys()]
802 def items(self):
803 return [(k, self[k]) for k in self.keys()]
806PythonDep: TypeAlias = Tuple[str, SemVerTuple]
809class TOCPackages:
810 """Package metadata management for schemas used in the container.
812 The container will always store for each schema used in the
813 information about one package providing that schema.
815 If there are multiple providers of the same schema,
816 the first/existing one is preferred.
817 """
819 @staticmethod
820 def _pkginfo_path_for(pkg_name: str, pkg_version: SemVerTuple) -> str:
821 return f"{M.METADOR_PACKAGES_PATH}/{to_ep_name(pkg_name, pkg_version)}"
823 def _add_providers(self, pkg: PythonDep, pkginfo: PluginPkgMeta):
824 # fill schema -> package lookup table for provided package
825 for schema_ref in pkginfo.plugins[schemas.name]:
826 if schema_ref not in self._providers:
827 self._providers[schema_ref] = set()
828 self._providers[schema_ref].add(pkg)
830 def _register(self, pkg: PythonDep, info: PluginPkgMeta):
831 pkg_path = self._pkginfo_path_for(*pkg)
832 self._raw[pkg_path] = bytes(info)
833 self._pkginfos[pkg] = info
834 self._add_providers(pkg, info)
836 def _unregister(self, pkg: PythonDep):
837 pkg_path = self._pkginfo_path_for(*pkg)
838 del self._raw[pkg_path]
839 info = self._pkginfos.pop(pkg)
840 # unregister providers
841 for schema_ref in info.plugins[schemas.name]:
842 providers = self._providers[schema_ref]
843 providers.remove(pkg)
844 if not providers: # schema not provided by any package
845 del self._providers[schema_ref]
847 # remove schemas group if it is empty (no schemas used in container)
848 if not self._raw.require_group(M.METADOR_PACKAGES_PATH).keys():
849 del self._raw[M.METADOR_PACKAGES_PATH]
851 def __init__(self, raw_container: H5FileLike):
852 self._raw: H5FileLike = raw_container
853 """Raw underlying container (for quick access)."""
855 self._pkginfos: Dict[PythonDep, PluginPkgMeta] = {}
856 """Package name + version -> package info"""
858 self._providers: Dict[PluginRef, Set[PythonDep]] = {}
859 """schema reference -> package name + version"""
861 # parse package infos if they exist
862 if M.METADOR_PACKAGES_PATH in self._raw:
863 deps_grp = self._raw.require_group(M.METADOR_PACKAGES_PATH)
864 for name, node in deps_grp.items():
865 pkg: PythonDep = from_ep_name(EPName(name))
866 info = PluginPkgMeta.parse_raw(cast(H5DatasetLike, node)[()])
867 self._pkginfos[pkg] = info
868 self._add_providers(pkg, info)
870 # ----
872 def __len__(self):
873 return len(self._pkginfos)
875 def __iter__(self):
876 return iter(self._pkginfos)
878 def __contains__(self, pkg: PythonDep):
879 return pkg in self._pkginfos
881 def __getitem__(self, pkg: PythonDep):
882 return self._pkginfos[pkg]
884 def keys(self):
885 return self._pkginfos.keys()
887 def values(self):
888 return self._pkginfos.values()
890 def items(self):
891 return self._pkginfos.items()
894class MetadorContainerTOC:
895 """Interface to the Metador metadata index (table of contents) of a container."""
897 def __init__(self, container: MetadorContainer):
898 self._container = container
899 self._raw = self._container.__wrapped__
901 ver = self.spec_version if M.METADOR_VERSION_PATH in self._raw else None
902 if ver:
903 if ver >= [2]:
904 msg = f"Unsupported Metador container version: {ver}"
905 raise ValueError(msg)
906 else:
907 if self._container.acl[NodeAcl.read_only]:
908 msg = "Container is read-only and does not look like a Metador container! "
909 msg += "Please open in writable mode to initialize Metador structures!"
910 raise ValueError(msg)
912 # writable + no version = fresh (for metador), initialize it
913 self._raw[M.METADOR_VERSION_PATH] = M.METADOR_SPEC_VERSION
914 self._raw[M.METADOR_UUID_PATH] = str(uuid1())
916 # if we're here, we have a prepared container TOC structure
918 # proceed to initialize TOC
919 self._driver_type: MetadorDriverEnum = get_driver_type(self._raw)
921 self._packages = TOCPackages(self._raw)
922 self._schemas = TOCSchemas(self._raw, self._packages)
923 self._links = TOCLinks(self._raw, self._schemas)
925 # ----
927 @property
928 def driver_type(self) -> MetadorDriverEnum:
929 """Return the type of the container driver."""
930 return self._driver_type
932 @property
933 def driver(self) -> Type[MetadorDriver]:
934 """Return the container driver class used by the container."""
935 return METADOR_DRIVERS[self.driver_type]
937 @property
938 def source(self) -> Any:
939 """Return data underlying thes container (file, set of files, etc. used with the driver)."""
940 return get_source(self._raw, self.driver_type)
942 # ----
944 @property
945 def container_uuid(self) -> UUID:
946 """Return UUID of the container."""
947 uuid = self._raw[M.METADOR_UUID_PATH]
948 uuid_ds = cast(H5DatasetLike, uuid)
949 return UUID(uuid_ds[()].decode("utf-8"))
951 @property
952 def spec_version(self) -> List[int]:
953 """Return Metador container specification version of the container."""
954 ver = cast(H5DatasetLike, self._raw[M.METADOR_VERSION_PATH])
955 return list(map(int, ver[()].decode("utf-8").split(".")))
957 @property
958 def schemas(self):
959 """Information about all schemas used for metadata objects in this container."""
960 return self._schemas
962 def query(
963 self,
964 schema: Union[str, Type[S]],
965 version: Optional[SemVerTuple] = None,
966 *,
967 node: Optional[MetadorNode] = None,
968 ) -> Iterator[MetadorNode]:
969 """Return nodes that contain a metadata object compatible with the given schema."""
970 schema_name, schema_ver = plugin_args(schema, version)
971 if not schema_name: # could be e.g. empty string
972 msg = "A schema name, plugin reference or class must be provided!"
973 raise ValueError(msg)
975 start_node: MetadorNode = node or self._container["/"]
977 # check start node metadata explicitly
978 if (schema_name, schema_ver) in start_node.meta:
979 yield start_node
981 if not isinstance(start_node, H5GroupLike):
982 return # the node is not group-like, cannot be traversed down
984 # collect nodes below start node recursively
985 # NOTE: yielding from the collect_nodes does not work :'(
986 # so we have to actually materialize the list >.<
987 # but we expose only the generator interface anyway (better design)
988 # (maybe consider replacing visititems with a custom traversal here)
989 ret: List[MetadorNode] = []
991 def collect_nodes(_, node: MetadorNode):
992 if (schema_name, schema_ver) in node.meta:
993 ret.append(node)
995 start_node.visititems(collect_nodes)
996 yield from iter(ret)