Coverage for src/metador_core/container/interface.py: 88%

511 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 09:33 +0000

1from __future__ import annotations 

2 

3import json 

4from dataclasses import dataclass 

5from enum import Enum, auto 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Dict, 

10 ItemsView, 

11 Iterator, 

12 KeysView, 

13 List, 

14 Optional, 

15 Set, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20 ValuesView, 

21 cast, 

22 overload, 

23) 

24from uuid import UUID, uuid1 

25 

26from typing_extensions import TypeAlias 

27 

28from ..plugin.types import EPName, from_ep_name, plugin_args, to_ep_name 

29from ..plugins import schemas 

30from ..schema import MetadataSchema 

31from ..schema.plugins import PluginPkgMeta, PluginRef 

32from ..schema.types import SemVerTuple 

33from ..util.types import H5DatasetLike, H5FileLike, H5GroupLike 

34from . import utils as M 

35from .drivers import ( 

36 METADOR_DRIVERS, 

37 MetadorDriver, 

38 MetadorDriverEnum, 

39 get_driver_type, 

40 get_source, 

41) 

42 

43if TYPE_CHECKING: 

44 from .wrappers import MetadorContainer, MetadorNode 

45 

46S = TypeVar("S", bound=MetadataSchema) 

47 

48 

49class NodeAcl(Enum): 

50 """Metador node soft access control flags. 

51 

52 Soft means - they can be bypassed, it is about trying to prevent errors. 

53 

54 Group nodes inherit their ACL flags to child nodes. 

55 """ 

56 

57 # NOTE: maybe refactor this to IntFlag? Then e.g. restrict() interface can be like: 

58 # node.restrict(acl=NodeAcl.read_only | NodeAcl.local_only) 

59 

60 read_only = auto() 

61 """Forbid calling methods mutating contents of (meta)data.""" 

62 

63 local_only = auto() 

64 """Forbid access to parents beyond the initial local node.""" 

65 

66 skel_only = auto() 

67 """Forbid reading datasets and metadata, only existence can be checked.""" 

68 

69 

70NodeAclFlags: TypeAlias = Dict[NodeAcl, bool] 

71 

72 

73@dataclass 

74class StoredMetadata: 

75 """Information about a metadata schema instance stored at a node.""" 

76 

77 uuid: UUID 

78 """UUID identifying the metadata object in the container. 

79 

80 Used for bookkeeping, i.e. keeping the container TOC in sync. 

81 """ 

82 

83 schema: PluginRef 

84 """Schema the object is an instance of.""" 

85 

86 node: H5DatasetLike 

87 """Node with serialized metadata object.""" 

88 

89 def to_path(self): 

90 """Return path of metadata object. 

91 

92 (E.g. to return canonical path for copying TOC link nodes) 

93 """ 

94 prefix = self.node.parent.name 

95 ep_name = to_ep_name(self.schema.name, self.schema.version) 

96 return f"{prefix}/{ep_name}={self.uuid}" 

97 

98 @staticmethod 

99 def from_node(obj: H5DatasetLike) -> StoredMetadata: 

100 """Instantiate info about a stored metadata node.""" 

101 path = obj.name 

102 segs = path.lstrip("/").split("/") 

103 ep_name, uuid_str = segs.pop().split("=") 

104 s_name, s_vers = from_ep_name(EPName(ep_name)) 

105 uuid = UUID(uuid_str) 

106 s_ref = schemas.PluginRef(name=s_name, version=s_vers) 

107 return StoredMetadata(uuid=uuid, schema=s_ref, node=obj) 

108 

109 

110def _schema_ref_for(ep_name: str) -> PluginRef: 

111 s_name, s_ver = from_ep_name(EPName(ep_name)) 

112 return schemas.PluginRef(name=s_name, version=s_ver) 

113 

114 

115def _ep_name_for(s_ref: PluginRef) -> str: 

116 return to_ep_name(s_ref.name, s_ref.version) 

117 

118 

119class MetadorMeta: 

120 """Interface to Metador metadata objects stored at a single HDF5 node.""" 

121 

122 # helpers for __getitem__ and __setitem__ 

123 

124 @staticmethod 

125 def _require_schema( 

126 schema_name: str, schema_ver: Optional[SemVerTuple] 

127 ) -> Type[MetadataSchema]: 

128 """Return compatible installed schema class, if possible. 

129 

130 Raises KeyError if no suitable schema was found. 

131 

132 Raises TypeError if an auxiliary schema is requested. 

133 """ 

134 schema_class = schemas._get_unsafe( 

135 schema_name, schema_ver 

136 ) # can raise KeyError 

137 if schema_class.Plugin.auxiliary: # reject auxiliary schemas in container 

138 msg = f"Cannot attach instances of auxiliary schema '{schema_name}' to a node!" 

139 raise TypeError(msg) 

140 return schema_class 

141 

142 @staticmethod 

143 def _parse_obj( 

144 schema: Type[S], obj: Union[str, bytes, Dict[str, Any], MetadataSchema] 

145 ) -> S: 

146 """Return original object if it is an instance of passed schema, or else parse it. 

147 

148 Raises ValidationError if parsing fails. 

149 """ 

150 if isinstance(obj, schema): 

151 return obj # skip validation, already correct model! 

152 # try to convert/parse it: 

153 if isinstance(obj, (str, bytes)): 

154 return schema.parse_raw(obj) 

155 if isinstance(obj, MetadataSchema): 

156 return schema.parse_obj(obj.dict()) 

157 else: # dict 

158 return schema.parse_obj(obj) 

159 

160 # raw getters and setters don't care about the environment, 

161 # they work only based on what objects are available and compatible 

162 # and do not perform validation etc. 

163 

164 def _get_raw( 

165 self, schema_name: str, version: Optional[SemVerTuple] = None 

166 ) -> Optional[StoredMetadata]: 

167 """Return stored metadata for given schema at this node (or None). 

168 

169 If a version is passed, the stored version must also be compatible. 

170 """ 

171 # retrieve stored instance (if suitable) 

172 ret: Optional[StoredMetadata] = self._objs.get(schema_name) 

173 if not version: 

174 return ret # no specified version -> anything goes 

175 # otherwise: only return if it is compatible 

176 req_ref: Optional[PluginRef] = None 

177 req_ref = schemas.PluginRef(name=schema_name, version=version) 

178 return ret if ret and req_ref.supports(ret.schema) else None 

179 

180 def _set_raw(self, schema_ref: PluginRef, obj: MetadataSchema) -> None: 

181 """Store metadata object as instance of passed schema at this node.""" 

182 # reserve UUID, construct dataset path and store metadata object 

183 obj_uuid = self._mc.metador._links.fresh_uuid() 

184 obj_path = f"{self._base_dir}/{_ep_name_for(schema_ref)}={str(obj_uuid)}" 

185 # store object 

186 self._mc.__wrapped__[obj_path] = bytes(obj) 

187 obj_node = self._mc.__wrapped__[obj_path] 

188 assert isinstance(obj_node, H5DatasetLike) 

189 stored_obj = StoredMetadata(uuid=obj_uuid, schema=schema_ref, node=obj_node) 

190 self._objs[schema_ref] = stored_obj 

191 # update TOC 

192 self._mc.metador._links.register(stored_obj) 

193 return 

194 

195 def _del_raw(self, schema_name: str, *, _unlink: bool = True) -> None: 

196 """Delete stored metadata for given schema at this node.""" 

197 # NOTE: _unlink is only for the destroy method 

198 stored_obj = self._objs[schema_name] 

199 # unregister in TOC (will also trigger clean up there) 

200 if _unlink: 

201 self._mc.metador._links.unregister(stored_obj.uuid) 

202 # remove metadata object 

203 del self._objs[stored_obj.schema.name] 

204 del self._mc.__wrapped__[stored_obj.node.name] 

205 # no metadata objects left -> remove metadata dir 

206 if not self._objs: 

207 del self._mc.__wrapped__[self._base_dir] 

208 return 

209 

210 # helpers for container-level opertions (move, copy, delete etc) 

211 

212 def _destroy(self, *, _unlink: bool = True): 

213 """Unregister and delete all metadata objects attached to this node.""" 

214 # NOTE: _unlink is only set to false for node copy without metadata 

215 for schema_name in list(self.keys()): 

216 self._del_raw(schema_name, _unlink=_unlink) 

217 

218 # ---- 

219 

220 def __init__(self, node: MetadorNode): 

221 self._mc: MetadorContainer = node._self_container 

222 """Underlying container (for convenience).""" 

223 

224 self._node: MetadorNode = node 

225 """Underlying actual user node.""" 

226 

227 is_dataset = isinstance(node, H5DatasetLike) 

228 self._base_dir: str = M.to_meta_base_path(node.name, is_dataset) 

229 """Path of this metador metadata group node. 

230 

231 Actual node exists iff any metadata is stored for the node. 

232 """ 

233 

234 self._objs: Dict[str, StoredMetadata] = {} 

235 """Information about available metadata objects.""" 

236 

237 # load available object metadata encoded in the node names 

238 meta_grp = cast(H5GroupLike, self._mc.__wrapped__.get(self._base_dir, {})) 

239 for obj_node in meta_grp.values(): 

240 assert isinstance(obj_node, H5DatasetLike) 

241 obj = StoredMetadata.from_node(obj_node) 

242 self._objs[obj.schema.name] = obj 

243 

244 # ---- 

245 

246 def keys(self) -> KeysView[str]: 

247 """Return names of explicitly attached metadata objects. 

248 

249 Transitive parent schemas are not included. 

250 """ 

251 return self._objs.keys() 

252 

253 def values(self) -> ValuesView[StoredMetadata]: 

254 self._node._guard_acl(NodeAcl.skel_only) 

255 return self._objs.values() 

256 

257 def items(self) -> ItemsView[str, StoredMetadata]: 

258 self._node._guard_acl(NodeAcl.skel_only) 

259 return self._objs.items() 

260 

261 # ---- 

262 

263 def __len__(self) -> int: 

264 """Return number of explicitly attached metadata objects. 

265 

266 Transitive parent schemas are not counted. 

267 """ 

268 return len(self.keys()) 

269 

270 def __iter__(self) -> Iterator[str]: 

271 """Iterate listing schema names of all actually attached metadata objects. 

272 

273 Transitive parent schemas are not included. 

274 """ 

275 return iter(self.keys()) 

276 

277 # ---- 

278 

279 def query( 

280 self, 

281 schema: Union[ 

282 str, Tuple[str, Optional[SemVerTuple]], PluginRef, Type[MetadataSchema] 

283 ] = "", 

284 version: Optional[SemVerTuple] = None, 

285 ) -> Iterator[PluginRef]: 

286 """Return schema names for which objects at this node are compatible with passed schema. 

287 

288 Will also consider compatible child schema instances. 

289 

290 Returned iterator will yield passed schema first, if an object is available. 

291 Apart from this, the order is not specified. 

292 """ 

293 schema_name, schema_ver = plugin_args(schema, version) 

294 # no schema selected -> list everything 

295 if not schema_name: 

296 for obj in self.values(): 

297 yield obj.schema 

298 return 

299 

300 # try exact schema (in any compatible version, if version specified) 

301 if obj := self._get_raw(schema_name, schema_ver): 

302 yield obj.schema 

303 

304 # next, try compatible child schemas of compatible versions of requested schema 

305 compat = set().union( 

306 *( 

307 self._mc.metador.schemas.children(ref) 

308 for ref in self._mc.metador.schemas.versions(schema_name, schema_ver) 

309 ) 

310 ) 

311 avail = {self._get_raw(s).schema for s in self.keys()} 

312 for s_ref in avail.intersection(compat): 

313 yield s_ref 

314 

315 def __contains__( 

316 self, 

317 schema: Union[ 

318 str, Tuple[str, Optional[SemVerTuple]], PluginRef, Type[MetadataSchema] 

319 ], 

320 ) -> bool: 

321 """Check whether a compatible metadata object for given schema exists. 

322 

323 Will also consider compatible child schema instances. 

324 """ 

325 if schema == "" or isinstance(schema, tuple) and schema[0] == "": 

326 return False # empty query lists everything, here the logic is inverted! 

327 return next(self.query(schema), None) is not None 

328 

329 @overload 

330 def __getitem__(self, schema: str) -> MetadataSchema: 

331 ... 

332 

333 @overload 

334 def __getitem__(self, schema: Type[S]) -> S: 

335 ... 

336 

337 def __getitem__(self, schema: Union[str, Type[S]]) -> Union[S, MetadataSchema]: 

338 """Like get, but will raise KeyError on failure.""" 

339 if ret := self.get(schema): 

340 return ret 

341 raise KeyError(schema) 

342 

343 @overload 

344 def get( 

345 self, schema: str, version: Optional[SemVerTuple] = None 

346 ) -> Optional[MetadataSchema]: 

347 ... 

348 

349 @overload 

350 def get( 

351 self, schema: Type[S], version: Optional[SemVerTuple] = None 

352 ) -> Optional[S]: 

353 ... 

354 

355 def get( 

356 self, schema: Union[str, Type[S]], version: Optional[SemVerTuple] = None 

357 ) -> Optional[Union[MetadataSchema, S]]: 

358 """Get a parsed metadata object matching the given schema (if it exists). 

359 

360 Will also consider compatible child schema instances. 

361 """ 

362 self._node._guard_acl(NodeAcl.skel_only) 

363 

364 # normalize arguments 

365 schema_name, schema_ver = plugin_args(schema, version) 

366 

367 # get a compatible schema instance that is available at this node 

368 compat_schema = next(self.query(schema_name, schema_ver), None) 

369 if not compat_schema: 

370 return None # not found 

371 

372 # get class of schema and parse object 

373 schema_class = self._require_schema(schema_name, schema_ver) 

374 if obj := self._get_raw(compat_schema.name, compat_schema.version): 

375 return cast(S, self._parse_obj(schema_class, obj.node[()])) 

376 return None 

377 

378 def __setitem__( 

379 self, schema: Union[str, Type[S]], value: Union[Dict[str, Any], MetadataSchema] 

380 ) -> None: 

381 """Store metadata object as instance of given schema. 

382 

383 Raises KeyError if passed schema is not installed in environment. 

384 

385 Raises TypeError if passed schema is marked auxiliary. 

386 

387 Raises ValueError if an object for the schema already exists. 

388 

389 Raises ValidationError if passed object is not valid for the schema. 

390 """ 

391 self._node._guard_acl(NodeAcl.read_only) 

392 schema_name, schema_ver = plugin_args(schema) 

393 

394 # if self.get(schema_name, schema_ver): # <- also subclass schemas 

395 # NOTE: for practical reasons let's be more lenient here and allow redundancy 

396 # hence only check if exact schema (modulo version) is already there 

397 if self._get_raw(schema_name): # <- only same schema 

398 msg = f"Metadata object for schema {schema_name} already exists!" 

399 raise ValueError(msg) 

400 

401 schema_class = self._require_schema(schema_name, schema_ver) 

402 checked_obj = self._parse_obj(schema_class, value) 

403 self._set_raw(schema_class.Plugin.ref(), checked_obj) 

404 

405 def __delitem__(self, schema: Union[str, Type[MetadataSchema]]) -> None: 

406 """Delete metadata object explicitly stored for the passed schema. 

407 

408 If a schema class is passed, its version is ignored, 

409 as each node may contain at most one explicit instance per schema. 

410 

411 Raises KeyError if no metadata object for that schema exists. 

412 """ 

413 self._node._guard_acl(NodeAcl.read_only) 

414 schema_name, _ = plugin_args(schema) 

415 

416 if self._get_raw(schema_name) is None: 

417 raise KeyError(schema_name) # no (explicit) metadata object 

418 

419 self._del_raw(schema_name) 

420 

421 

422# ---- 

423 

424 

425class TOCLinks: 

426 """Link management for synchronizing metadata objects and container TOC.""" 

427 

428 # NOTE: This is not exposed to the end-user 

429 

430 @staticmethod 

431 def _link_path_for(schema_ref: PluginRef) -> str: 

432 return f"{M.METADOR_LINKS_PATH}/{_ep_name_for(schema_ref)}" 

433 

434 def __init__(self, raw_cont: H5FileLike, toc_schemas: TOCSchemas): 

435 self._raw: H5FileLike = raw_cont 

436 """Raw underlying container (for quick access).""" 

437 

438 self._toc_schemas = toc_schemas 

439 """Schemas used in container (to (un)register).""" 

440 

441 self._toc_path: Dict[UUID, str] = {} 

442 """Maps metadata object UUIDs to paths of respective pseudo-symlink in TOC.""" 

443 

444 # load links into memory 

445 if M.METADOR_LINKS_PATH in self._raw: 

446 link_grp = self._raw.require_group(M.METADOR_LINKS_PATH) 

447 assert isinstance(link_grp, H5GroupLike) 

448 for schema_link_grp in link_grp.values(): 

449 assert isinstance(schema_link_grp, H5GroupLike) 

450 for uuid, link_node in schema_link_grp.items(): 

451 assert isinstance(link_node, H5DatasetLike) 

452 self._toc_path[UUID(uuid)] = link_node.name 

453 

454 def fresh_uuid(self) -> UUID: 

455 """Return a UUID string not used for a metadata object in the container yet.""" 

456 fresh = False 

457 ret: UUID 

458 # NOTE: here a very unlikely race condition is present if parallelized 

459 while not fresh: 

460 ret = uuid1() 

461 fresh = ret not in self._toc_path 

462 self._toc_path[ret] = None # not assigned yet, but "reserved" 

463 # ---- 

464 return ret 

465 

466 def resolve(self, uuid: UUID) -> str: 

467 """Get the path a UUID in the TOC points to.""" 

468 link_path = self._toc_path[uuid] 

469 link_node = cast(H5DatasetLike, self._raw[link_path]) 

470 return link_node[()].decode("utf-8") 

471 

472 def update(self, uuid: UUID, new_target: str): 

473 """Update target of an existing link to point to a new location.""" 

474 link_path = self._toc_path[uuid] 

475 del self._raw[link_path] 

476 self._raw[link_path] = new_target 

477 

478 def register(self, obj: StoredMetadata) -> None: 

479 """Create a link for a metadata object in container TOC. 

480 

481 The link points to the metadata object. 

482 """ 

483 self._toc_schemas._register(obj.schema) 

484 

485 toc_path = f"{self._link_path_for(obj.schema)}/{obj.uuid}" 

486 self._toc_path[obj.uuid] = toc_path 

487 self._raw[toc_path] = str(obj.node.name) 

488 

489 def unregister(self, uuid: UUID) -> None: 

490 """Unregister metadata object in TOC given its UUID. 

491 

492 Will remove the object and clean up empty directories in the TOC. 

493 """ 

494 # delete the link itself and free the UUID 

495 toc_path = self._toc_path[uuid] 

496 

497 schema_group = self._raw[toc_path].parent 

498 assert isinstance(schema_group, H5GroupLike) 

499 link_group = schema_group.parent 

500 assert link_group.name == M.METADOR_LINKS_PATH 

501 

502 del self._raw[toc_path] 

503 del self._toc_path[uuid] 

504 if len(schema_group): 

505 return # schema still has instances 

506 

507 s_name_vers: str = schema_group.name.split("/")[-1] 

508 # delete empty group for schema 

509 del self._raw[schema_group.name] 

510 # notify schema manager (cleans up schema + package info) 

511 self._toc_schemas._unregister(_schema_ref_for(s_name_vers)) 

512 

513 if len(link_group.keys()): 

514 return # container still has metadata 

515 else: 

516 # remove the link dir itself (no known metadata in container left) 

517 del self._raw[link_group.name] 

518 

519 # ---- 

520 

521 def find_broken(self, repair: bool = False) -> List[UUID]: 

522 """Return list of UUIDs in TOC not pointing to an existing metadata object. 

523 

524 Will use loaded cache of UUIDs and check them, without scanning the container. 

525 

526 If repair is set, will remove those broken links. 

527 """ 

528 broken = [] 

529 for uuid in self._toc_path.keys(): 

530 target = self.resolve(uuid) 

531 if target not in self._raw: 

532 broken.append(uuid) 

533 if repair: 

534 for uuid in broken: 

535 self.unregister(uuid) 

536 return broken 

537 

538 def find_missing(self, path: H5GroupLike) -> List[H5DatasetLike]: 

539 """Return list of metadata objects not listed in TOC.""" 

540 missing = [] 

541 

542 def collect_missing(_, node): 

543 if not M.is_internal_path(node.name, M.METADOR_META_PREF): 

544 return # not a metador metadata path 

545 if M.is_meta_base_path(node.name): 

546 # top dir, not a "link dataset", 

547 # e.g. /.../foo/metador_meta_ or /.../metador_meta_foo 

548 return 

549 

550 # now we assume we have a path to a metadata link object in the group 

551 obj = StoredMetadata.from_node(node) 

552 known = obj.uuid in self._toc_path 

553 # check UUID collision: i.e., used in TOC, but points elsewhere 

554 # (requires fixing up the name of this object / new UUID) 

555 # implies that THIS object IS missing in the TOC 

556 collision = known and self.resolve(obj.uuid) != node.name 

557 if not known or collision: 

558 missing.append(node) 

559 

560 # ensure its a group and collect 

561 self._raw.require_group(path.name).visititems(collect_missing) 

562 return missing 

563 

564 def repair_missing( 

565 self, missing: List[H5DatasetLike], update: bool = False 

566 ) -> None: 

567 """Repair links (objects get new UUIDs, unless update is true).""" 

568 # NOTE: needed for correct copy and move of nodes with their metadata 

569 for node in missing: 

570 obj = StoredMetadata.from_node(node) 

571 if update and obj.uuid in self._toc_path: 

572 # update target of existing link (e.g. for move) 

573 self.update(obj.uuid, node.name) 

574 else: 

575 # assign new UUID (e.g. for copy) 

576 # (copied metadata node refers to some other uuid in the name) 

577 obj.uuid = self.fresh_uuid() 

578 new_path = obj.to_path() 

579 # rename the metadata node to point to the new UUID 

580 self._raw.move(node.name, new_path) 

581 obj.node = cast(H5DatasetLike, self._raw[new_path]) 

582 # register the object with the new UUID in the TOC 

583 self.register(obj) 

584 

585 

586class TOCSchemas: 

587 """Schema management for schemas used in the container. 

588 

589 Interface is made to mimic PGSchema wherever it makes sense. 

590 """ 

591 

592 @classmethod 

593 def _schema_path_for(cls, s_ref: PluginRef) -> str: 

594 return f"{M.METADOR_SCHEMAS_PATH}/{to_ep_name(s_ref.name, s_ref.version)}" 

595 

596 @classmethod 

597 def _jsonschema_path_for(cls, s_ref: PluginRef) -> str: 

598 return f"{cls._schema_path_for(s_ref)}/jsonschema.json" 

599 

600 @staticmethod 

601 def _load_json(node: H5DatasetLike): 

602 return json.loads(node[()].decode("utf-8")) 

603 

604 def _update_parents_children( 

605 self, schema_ref: PluginRef, parents: Optional[List[PluginRef]] 

606 ): 

607 if parents is None: # remove schema 

608 for parent in self._parents[schema_ref]: 

609 if parent in self._schemas: 

610 self._children[parent].remove(schema_ref) 

611 elif all( 

612 (child not in self._schemas for child in self._children[parent]) 

613 ): 

614 del self._parents[parent] 

615 del self._children[parent] 

616 else: # add schema 

617 for i, parent in enumerate(parents): 

618 if parent not in self._parents: 

619 self._parents[parent] = parents[: i + 1] 

620 if parent not in self._children: 

621 self._children[parent] = set() 

622 if parent != schema_ref: 

623 self._children[parent].add(schema_ref) 

624 

625 def _register(self, schema_ref: PluginRef): 

626 """Notify that a schema is used in the container (metadata object is created/updated). 

627 

628 If the schema has not been used before in the container, will store metadata about it. 

629 """ 

630 if schema_ref in self._schemas: 

631 return # nothing to do 

632 

633 # store json schema 

634 schema_cls = schemas.get(schema_ref.name, schema_ref.version) 

635 jsonschema_dat = schema_cls.schema_json().encode("utf-8") 

636 jsonschema_path = self._jsonschema_path_for(schema_ref) 

637 self._raw[jsonschema_path] = jsonschema_dat 

638 

639 # store parent schema refs 

640 compat_path = f"{self._schema_path_for(schema_ref)}/compat" 

641 parents = schemas.parent_path(schema_ref.name, schema_ref.version) 

642 parents_dat: bytes = json.dumps(list(map(lambda x: x.dict(), parents))).encode( 

643 "utf-8" 

644 ) 

645 

646 self._raw[compat_path] = parents_dat 

647 self._schemas.add(schema_ref) 

648 self._update_parents_children(schema_ref, parents) 

649 

650 # add providing package (if no stored package provides it) 

651 if not self._pkgs._providers.get(schema_ref, []): 

652 env_pkg_info: PluginPkgMeta = schemas.provider(schema_cls.Plugin.ref()) 

653 pkg_name_ver = (str(env_pkg_info.name), env_pkg_info.version) 

654 self._pkgs._register(pkg_name_ver, env_pkg_info) 

655 self._used[pkg_name_ver] = set() 

656 

657 # update used schemas tracker for all packages providing this schema 

658 for pkg in self._pkgs._providers[schema_ref]: 

659 self._used[pkg].add(schema_ref) 

660 

661 def _unregister(self, schema_ref: PluginRef): 

662 """Notify that a schema is not used at any container node anymore. 

663 

664 If after that no schema of a listed dep package is used, 

665 this dependency will be removed from the container. 

666 """ 

667 del self._raw[self._schema_path_for(schema_ref)] 

668 self._schemas.remove(schema_ref) 

669 self._update_parents_children(schema_ref, None) 

670 

671 providers = set(self._pkgs._providers[schema_ref]) 

672 for pkg in providers: 

673 pkg_used = self._used[pkg] 

674 if schema_ref in pkg_used: 

675 # remove schema from list of used schemas of pkg 

676 pkg_used.remove(schema_ref) 

677 if not len(pkg_used): 

678 # package not used anymore in container -> clean up 

679 self._pkgs._unregister(pkg) 

680 

681 # remove schemas group if it is empty (no schemas used in container) 

682 if not self._raw.require_group(M.METADOR_SCHEMAS_PATH).keys(): 

683 del self._raw[M.METADOR_SCHEMAS_PATH] 

684 

685 def __init__(self, raw_cont: H5FileLike, toc_packages: TOCPackages): 

686 self._raw: H5FileLike = raw_cont 

687 """Raw underlying container (for quick access).""" 

688 

689 self._pkgs = toc_packages 

690 """TOC package metadata manager object.""" 

691 

692 self._schemas: Set[PluginRef] = set() 

693 """Stored JSON Schemas of actually used schemas.""" 

694 

695 self._parents: Dict[PluginRef, List[PluginRef]] = {} 

696 """Parents of a used json schema (i.e. other partially compatible schemas).""" 

697 

698 self._children: Dict[PluginRef, Set[PluginRef]] = {} 

699 """Children of a used json schema (i.e. other fully compatible schemas).""" 

700 

701 self._used: Dict[PythonDep, Set[PluginRef]] = {} 

702 """package name + version -> name of schemas used in container""" 

703 

704 for pkg in self._pkgs.keys(): 

705 self._used[pkg] = set() 

706 

707 if M.METADOR_SCHEMAS_PATH in self._raw: 

708 schema_grp = self._raw.require_group(M.METADOR_SCHEMAS_PATH) 

709 for name, node in schema_grp.items(): 

710 s_ref: PluginRef = _schema_ref_for(name) 

711 assert isinstance(node, H5GroupLike) 

712 compat = node["compat"] 

713 assert isinstance(compat, H5DatasetLike) 

714 

715 reflist = json.loads(compat[()].decode("utf-8")) 

716 parents = list(map(PluginRef.parse_obj, reflist)) 

717 

718 self._schemas.add(s_ref) 

719 self._update_parents_children(s_ref, parents) 

720 for pkg in self._pkgs._providers[s_ref]: 

721 self._used[pkg].add(s_ref) 

722 

723 @property 

724 def packages(self) -> TOCPackages: 

725 """Like PluginGroup.packages, but with respect to schemas used in container.""" 

726 return self._pkgs 

727 

728 def provider(self, schema_ref: PluginRef) -> PluginPkgMeta: 

729 """Like PluginGroup.provider, but with respect to container deps.""" 

730 pkg_name_ver = next(iter(self._pkgs._providers.get(schema_ref, [])), None) 

731 if pkg_name_ver is None: 

732 msg = f"Did not find metadata of a package providing schema: '{schema_ref}'" 

733 raise KeyError(msg) 

734 return self._pkgs[pkg_name_ver] 

735 

736 def parent_path( 

737 self, schema, version: Optional[SemVerTuple] = None 

738 ) -> List[PluginRef]: 

739 """Like PGSchema.parent_path, but with respect to container deps.""" 

740 name, vers = plugin_args(schema, version, require_version=True) 

741 s_ref = schemas.PluginRef(name=name, version=vers) 

742 return self._parents[s_ref] 

743 

744 def versions( 

745 self, p_name: str, version: Optional[SemVerTuple] = None 

746 ) -> List[PluginRef]: 

747 """Like PGSchema.versions, but with respect to container deps.""" 

748 # NOTE: using _children instead of _schemas because some are only listed 

749 # due to their appearance in the parent_path of some actually used schema 

750 # but we need them here for "parent compatibility" to work right. 

751 refs = list(filter(lambda s: s.name == p_name, self._children)) 

752 

753 if version is None: 

754 return refs 

755 # filter plugins for compatible version 

756 requested = schemas.PluginRef(name=p_name, version=version) 

757 # NOTE: here "supports" arguments are reversed (compared to "plugin versions")! 

758 # because its about instances (that must be "below" the requested schema version) 

759 return [ref for ref in refs if requested.supports(ref)] 

760 

761 def children(self, schema, version: Optional[SemVerTuple] = None) -> Set[PluginRef]: 

762 """Like PGSchema.children, but with respect to container deps.""" 

763 name, vers = plugin_args(schema, version) 

764 if vers is not None: 

765 s_refs = [schemas.PluginRef(name=name, version=vers)] 

766 else: 

767 # if no version is given, collect all possibilities 

768 s_refs = [ref for ref in self._children.keys() if ref.name == name] 

769 # return all that can be actually retrieved 

770 return set().union( 

771 *filter(lambda x: x is not None, map(self._children.get, s_refs)) 

772 ) 

773 

774 # ---- 

775 

776 def __len__(self): 

777 return len(self._schemas) 

778 

779 def __iter__(self): 

780 return iter(self.keys()) 

781 

782 def __contains__(self, schema_ref: PluginRef): 

783 return schema_ref in self._schemas 

784 

785 def __getitem__(self, schema_ref: PluginRef): 

786 node_path = self._jsonschema_path_for(schema_ref) 

787 assert node_path in self._raw 

788 return self._load_json(cast(H5DatasetLike, self._raw[node_path])) 

789 

790 def get(self, schema_ref: PluginRef): 

791 try: 

792 self[schema_ref] 

793 except KeyError: 

794 return None 

795 

796 def keys(self): 

797 return set(self._schemas) 

798 

799 def values(self): 

800 return [self[k] for k in self.keys()] 

801 

802 def items(self): 

803 return [(k, self[k]) for k in self.keys()] 

804 

805 

806PythonDep: TypeAlias = Tuple[str, SemVerTuple] 

807 

808 

809class TOCPackages: 

810 """Package metadata management for schemas used in the container. 

811 

812 The container will always store for each schema used in the 

813 information about one package providing that schema. 

814 

815 If there are multiple providers of the same schema, 

816 the first/existing one is preferred. 

817 """ 

818 

819 @staticmethod 

820 def _pkginfo_path_for(pkg_name: str, pkg_version: SemVerTuple) -> str: 

821 return f"{M.METADOR_PACKAGES_PATH}/{to_ep_name(pkg_name, pkg_version)}" 

822 

823 def _add_providers(self, pkg: PythonDep, pkginfo: PluginPkgMeta): 

824 # fill schema -> package lookup table for provided package 

825 for schema_ref in pkginfo.plugins[schemas.name]: 

826 if schema_ref not in self._providers: 

827 self._providers[schema_ref] = set() 

828 self._providers[schema_ref].add(pkg) 

829 

830 def _register(self, pkg: PythonDep, info: PluginPkgMeta): 

831 pkg_path = self._pkginfo_path_for(*pkg) 

832 self._raw[pkg_path] = bytes(info) 

833 self._pkginfos[pkg] = info 

834 self._add_providers(pkg, info) 

835 

836 def _unregister(self, pkg: PythonDep): 

837 pkg_path = self._pkginfo_path_for(*pkg) 

838 del self._raw[pkg_path] 

839 info = self._pkginfos.pop(pkg) 

840 # unregister providers 

841 for schema_ref in info.plugins[schemas.name]: 

842 providers = self._providers[schema_ref] 

843 providers.remove(pkg) 

844 if not providers: # schema not provided by any package 

845 del self._providers[schema_ref] 

846 

847 # remove schemas group if it is empty (no schemas used in container) 

848 if not self._raw.require_group(M.METADOR_PACKAGES_PATH).keys(): 

849 del self._raw[M.METADOR_PACKAGES_PATH] 

850 

851 def __init__(self, raw_container: H5FileLike): 

852 self._raw: H5FileLike = raw_container 

853 """Raw underlying container (for quick access).""" 

854 

855 self._pkginfos: Dict[PythonDep, PluginPkgMeta] = {} 

856 """Package name + version -> package info""" 

857 

858 self._providers: Dict[PluginRef, Set[PythonDep]] = {} 

859 """schema reference -> package name + version""" 

860 

861 # parse package infos if they exist 

862 if M.METADOR_PACKAGES_PATH in self._raw: 

863 deps_grp = self._raw.require_group(M.METADOR_PACKAGES_PATH) 

864 for name, node in deps_grp.items(): 

865 pkg: PythonDep = from_ep_name(EPName(name)) 

866 info = PluginPkgMeta.parse_raw(cast(H5DatasetLike, node)[()]) 

867 self._pkginfos[pkg] = info 

868 self._add_providers(pkg, info) 

869 

870 # ---- 

871 

872 def __len__(self): 

873 return len(self._pkginfos) 

874 

875 def __iter__(self): 

876 return iter(self._pkginfos) 

877 

878 def __contains__(self, pkg: PythonDep): 

879 return pkg in self._pkginfos 

880 

881 def __getitem__(self, pkg: PythonDep): 

882 return self._pkginfos[pkg] 

883 

884 def keys(self): 

885 return self._pkginfos.keys() 

886 

887 def values(self): 

888 return self._pkginfos.values() 

889 

890 def items(self): 

891 return self._pkginfos.items() 

892 

893 

894class MetadorContainerTOC: 

895 """Interface to the Metador metadata index (table of contents) of a container.""" 

896 

897 def __init__(self, container: MetadorContainer): 

898 self._container = container 

899 self._raw = self._container.__wrapped__ 

900 

901 ver = self.spec_version if M.METADOR_VERSION_PATH in self._raw else None 

902 if ver: 

903 if ver >= [2]: 

904 msg = f"Unsupported Metador container version: {ver}" 

905 raise ValueError(msg) 

906 else: 

907 if self._container.acl[NodeAcl.read_only]: 

908 msg = "Container is read-only and does not look like a Metador container! " 

909 msg += "Please open in writable mode to initialize Metador structures!" 

910 raise ValueError(msg) 

911 

912 # writable + no version = fresh (for metador), initialize it 

913 self._raw[M.METADOR_VERSION_PATH] = M.METADOR_SPEC_VERSION 

914 self._raw[M.METADOR_UUID_PATH] = str(uuid1()) 

915 

916 # if we're here, we have a prepared container TOC structure 

917 

918 # proceed to initialize TOC 

919 self._driver_type: MetadorDriverEnum = get_driver_type(self._raw) 

920 

921 self._packages = TOCPackages(self._raw) 

922 self._schemas = TOCSchemas(self._raw, self._packages) 

923 self._links = TOCLinks(self._raw, self._schemas) 

924 

925 # ---- 

926 

927 @property 

928 def driver_type(self) -> MetadorDriverEnum: 

929 """Return the type of the container driver.""" 

930 return self._driver_type 

931 

932 @property 

933 def driver(self) -> Type[MetadorDriver]: 

934 """Return the container driver class used by the container.""" 

935 return METADOR_DRIVERS[self.driver_type] 

936 

937 @property 

938 def source(self) -> Any: 

939 """Return data underlying thes container (file, set of files, etc. used with the driver).""" 

940 return get_source(self._raw, self.driver_type) 

941 

942 # ---- 

943 

944 @property 

945 def container_uuid(self) -> UUID: 

946 """Return UUID of the container.""" 

947 uuid = self._raw[M.METADOR_UUID_PATH] 

948 uuid_ds = cast(H5DatasetLike, uuid) 

949 return UUID(uuid_ds[()].decode("utf-8")) 

950 

951 @property 

952 def spec_version(self) -> List[int]: 

953 """Return Metador container specification version of the container.""" 

954 ver = cast(H5DatasetLike, self._raw[M.METADOR_VERSION_PATH]) 

955 return list(map(int, ver[()].decode("utf-8").split("."))) 

956 

957 @property 

958 def schemas(self): 

959 """Information about all schemas used for metadata objects in this container.""" 

960 return self._schemas 

961 

962 def query( 

963 self, 

964 schema: Union[str, Type[S]], 

965 version: Optional[SemVerTuple] = None, 

966 *, 

967 node: Optional[MetadorNode] = None, 

968 ) -> Iterator[MetadorNode]: 

969 """Return nodes that contain a metadata object compatible with the given schema.""" 

970 schema_name, schema_ver = plugin_args(schema, version) 

971 if not schema_name: # could be e.g. empty string 

972 msg = "A schema name, plugin reference or class must be provided!" 

973 raise ValueError(msg) 

974 

975 start_node: MetadorNode = node or self._container["/"] 

976 

977 # check start node metadata explicitly 

978 if (schema_name, schema_ver) in start_node.meta: 

979 yield start_node 

980 

981 if not isinstance(start_node, H5GroupLike): 

982 return # the node is not group-like, cannot be traversed down 

983 

984 # collect nodes below start node recursively 

985 # NOTE: yielding from the collect_nodes does not work :'( 

986 # so we have to actually materialize the list >.< 

987 # but we expose only the generator interface anyway (better design) 

988 # (maybe consider replacing visititems with a custom traversal here) 

989 ret: List[MetadorNode] = [] 

990 

991 def collect_nodes(_, node: MetadorNode): 

992 if (schema_name, schema_ver) in node.meta: 

993 ret.append(node) 

994 

995 start_node.visititems(collect_nodes) 

996 yield from iter(ret)