Coverage for src/metador_core/ih5/overlay.py: 98%

400 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 09:33 +0000

1"""Overlay wrappers to access a virtual record consisting of a base container + patches. 

2 

3The wrappers take care of dispatching requests to records, 

4groups and attributes to the correct path. 

5""" 

6from __future__ import annotations 

7 

8import re 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 Dict, 

16 List, 

17 Optional, 

18 Type, 

19 TypeVar, 

20 Union, 

21 cast, 

22) 

23 

24import h5py 

25import numpy as np 

26 

27from ..util.types import H5DatasetLike 

28 

29if TYPE_CHECKING: 

30 from ..util.types import H5GroupLike 

31 from .record import IH5Record 

32else: 

33 IH5Record = Any 

34 

35 

36# dataset value marking a deleted group, dataset or attribute 

37DEL_VALUE = np.void(b"\x7f") # ASCII DELETE 

38 

39T = TypeVar("T") 

40 

41 

42def _is_del_mark(val) -> bool: 

43 return isinstance(val, np.void) and val.tobytes() == DEL_VALUE.tobytes() 

44 

45 

46def _node_is_del_mark(node) -> bool: 

47 """Return whether node is marking a deleted group/dataset/attribute value.""" 

48 val = node[()] if isinstance(node, h5py.Dataset) else node 

49 return _is_del_mark(val) 

50 

51 

52# attribute key marking group substitution (instead of pass-through default for groups) 

53# attribute value does not matter, but should be np.Empty(None) 

54# if present, all children and attributes are interpreted as normal nodes, not as a patch 

55SUBST_KEY = "\x1a" # ASCII SUBSTITUTE 

56 

57 

58def _node_is_virtual(node) -> bool: 

59 """Virtual node (i.e. transparent and only carrier for child nodes and attributes).""" 

60 return isinstance(node, h5py.Group) and SUBST_KEY not in node.attrs 

61 

62 

63@dataclass(frozen=True) 

64class IH5Node: 

65 """An overlay node wraps a group, dataset or attribute manager. 

66 

67 It takes care of finding the correct container to look for the data 

68 and helps with patching data in a new patch container. 

69 

70 It essentially lifts the interface of h5py from a single file to an IH5 record 

71 that may consist of a base container file and a number of patch containers. 

72 """ 

73 

74 _record: IH5Record 

75 """Record this node belongs to (needed to access the actual data).""" 

76 

77 _gpath: str 

78 """Path in record that this node represents (absolute wrt. root of record).""" 

79 

80 _cidx: int 

81 """Left boundary index for lookups in order of loaded containers, i.e. 

82 this node will not consider containers with smaller index than that. 

83 """ 

84 

85 def __post_init__(self): 

86 """Instantiate an overlay node.""" 

87 if not self._gpath or self._gpath[0] != "/": 

88 raise ValueError("Path must be absolute!") 

89 if self._cidx < 0: 

90 raise ValueError("Creation index must be non-negative!") 

91 

92 @property 

93 def _files(self) -> List[h5py.File]: 

94 return self._record.__files__ 

95 

96 def __hash__(self): 

97 """Hash an overlay node. 

98 

99 Two nodes are equivalent if they are linked to the same 

100 open record and address the same entity. 

101 """ 

102 return hash((id(self._record), self._gpath, self._cidx)) 

103 

104 def __bool__(self) -> bool: 

105 return bool(self._files) and all(map(bool, self._files)) 

106 

107 @property 

108 def _last_idx(self): 

109 """Index of the latest container.""" 

110 return len(self._files) - 1 

111 

112 @property 

113 def _is_read_only(self) -> bool: 

114 """Return true if the newest container is read-only and nothing can be written.""" 

115 return not self._record._has_writable 

116 

117 def _guard_open(self): 

118 """Check that the record is open (if it was closed, the files are gone).""" 

119 if not self: 

120 raise KeyError("Record is not open or accessible!") 

121 

122 def _guard_read_only(self): 

123 if self._is_read_only: 

124 raise ValueError("Create a patch in order to change the container!") 

125 

126 def _guard_value(self, data): 

127 if _is_del_mark(data): 

128 raise ValueError(f"Value '{data}' is forbidden, cannot assign!") 

129 if isinstance(data, IH5Node): 

130 raise ValueError("Hard links are not supported, cannot assign!") 

131 if isinstance(data, h5py.SoftLink) or isinstance(data, h5py.ExternalLink): 

132 raise ValueError("SymLink and ExternalLink not supported, cannot assign!") 

133 

134 @classmethod 

135 def _latest_idx(cls, files, path: str) -> Optional[int]: 

136 """Return index of newest file where the group/dataset was overwritten/created. 

137 

138 Returns None if not found or most recent value is a deletion mark. 

139 """ 

140 idx = None 

141 for i in reversed(range(len(files))): 

142 if _node_is_del_mark(files[i][path]): 

143 return None 

144 elif _node_is_virtual(files[i][path]): 

145 idx = i 

146 else: 

147 return i # some patch overrides the group 

148 return idx 

149 

150 # path transformations 

151 

152 def _parent_path(self) -> str: 

153 """Return path of the parent node (the root is its own parent).""" 

154 if self._gpath == "/": 

155 return "/" 

156 segs = self._gpath.split("/")[:-1] 

157 return "/" if segs == [""] else "/".join(segs) 

158 

159 def _rel_path(self, path: str) -> str: 

160 """Return relative path based on node location, if passed path is absolute. 

161 

162 If relative, returns the path back unchanged. 

163 """ 

164 if path[0] != "/": 

165 return path 

166 if path.find(self._gpath) != 0: 

167 raise RuntimeError("Invalid usage, cannot strip non-matching prefix!") 

168 start_idx = len(self._gpath) + int(self._gpath != "/") 

169 return path[start_idx:] 

170 

171 def _abs_path(self, path: str) -> str: 

172 """Return absolute path based on node location, if given path is relative. 

173 

174 If absolute, returns the path back unchanged. 

175 """ 

176 pref = self._gpath if self._gpath != "/" else "" 

177 return path if path and path[0] == "/" else f"{pref}/{path}" 

178 

179 def _inspect_path(self, path): # pragma: no cover 

180 """Print the path node of all containers where the path is contained in.""" 

181 print(f"Path {path}:") 

182 for j in range(len(self._files)): 

183 if path in self._files[j]: 

184 node = self._files[j][path] 

185 print(f" idx={j}: {type(node).__name__}") 

186 if isinstance(node, h5py.Dataset): 

187 print(" ", node[()]) 

188 

189 

190class IH5InnerNode(IH5Node): 

191 """Common functionality for Group and AttributeManager. 

192 

193 Will grant either access to child records/subgroups, 

194 or to the attributes attached to the group/dataset at a path in a record. 

195 """ 

196 

197 @property 

198 def _is_attrs(self) -> bool: 

199 return self.__is_attrs__ 

200 

201 def __init__( 

202 self, 

203 record: IH5Record, 

204 gpath: str, 

205 creation_idx: int, 

206 attrs: bool = False, 

207 ): 

208 """See `IH5Node` constructor. 

209 

210 This variant represents an "overlay container", of which there are two types - 

211 a group (h5py.Group) and a set of attributes (h5py.AttributeManager). 

212 

213 This class takes care of both (in order to avoid lots of code duplication), 

214 distinguishing them through the additional `attrs` flag. 

215 """ 

216 super().__init__(record, gpath, creation_idx) 

217 # if attrs set, represents AttributeManager, otherwise its a group 

218 self.__is_attrs__: bool = attrs 

219 

220 def _guard_key(self, key: str): 

221 """Check a key used with bracket accessor notation. 

222 

223 (e.g. used for `__getitem__, __setitem__, __delitem__`) 

224 """ 

225 if key == "": 

226 raise ValueError("Invalid empty path!") 

227 if key.find("@") >= 0: # used as attribute separator in the skeleton! TODO 

228 raise ValueError(f"Invalid symbol '@' in key: '{key}'!") 

229 if re.match(r"^[!-~]+$", key) is None: 

230 raise ValueError("Invalid key: Only printable ASCII is allowed!") 

231 if self._is_attrs and (key.find("/") >= 0 or key == SUBST_KEY): 

232 raise ValueError(f"Invalid attribute key: '{key}'!") 

233 

234 def _get_child_raw(self, key: str, cidx: int) -> Any: 

235 """Return given child (dataset, group, attribute) from given container.""" 

236 if self._is_attrs: 

237 return self._files[cidx][self._gpath].attrs[key] 

238 else: 

239 return self._files[cidx][self._abs_path(key)] 

240 

241 def _get_child(self, key: str, cidx: int) -> Any: 

242 """Like _get_child_raw, but wraps the result with an overlay class if needed.""" 

243 val = self._get_child_raw(key, cidx) 

244 path = self._abs_path(key) 

245 if isinstance(val, h5py.Group): 

246 return IH5Group(self._record, path, cidx) 

247 elif isinstance(val, h5py.Dataset): 

248 return IH5Dataset(self._record, path, cidx) 

249 else: 

250 return val 

251 

252 def _children(self) -> Dict[str, int]: 

253 """Return dict mapping from a child name to the most recent overriding patch idx. 

254 

255 For datasets, dereferencing the child path in that container will give the data. 

256 For groups, the returned number is to be treated as the lower bound, i.e. 

257 the child creation_idx to recursively get the descendents. 

258 """ 

259 self._guard_open() 

260 

261 children: Dict[str, int] = {} 

262 is_virtual: Dict[str, bool] = {} 

263 for i in reversed(range(self._cidx, len(self._files))): 

264 if self._gpath not in self._files[i]: 

265 continue 

266 

267 obj = self._files[i][self._gpath] 

268 if self._is_attrs: 

269 obj = obj.attrs 

270 assert isinstance(obj, (h5py.Group, h5py.AttributeManager)) 

271 

272 # keep most recent version of child node / attribute 

273 for k in obj.keys(): 

274 if k not in children: 

275 is_virtual[k] = _node_is_virtual(self._get_child_raw(k, i)) 

276 children[k] = i 

277 elif is_virtual[k]: # .. and k in children! 

278 # decrease lower bound 

279 children[k] = min(children[k], i) 

280 

281 # return resulting child nodes / attributes (without the deleted ones) 

282 # in alphabetical order, 

283 # in case of attributes, also excludes special SUBST marker attribute 

284 return { 

285 k: idx 

286 for k, idx in sorted(children.items(), key=lambda x: x[0]) 

287 if (not self._is_attrs or k != SUBST_KEY) 

288 and not _node_is_del_mark(self._get_child_raw(k, idx)) 

289 } 

290 

291 def _get_children(self) -> List[Any]: 

292 """Get alphabetically ordered list of child nodes.""" 

293 return [ 

294 self._get_child(self._abs_path(k), idx) 

295 for k, idx in self._children().items() 

296 ] 

297 

298 def _node_seq(self, path: str) -> List[IH5Node]: 

299 """Return node sequence (one node per path prefix) to given path. 

300 

301 Returns: 

302 Sequence starting with the current node (if path is relative) 

303 or the root node (if absolute) followed by all successive 

304 children along the requested path that exist. 

305 """ 

306 curr: IH5InnerNode = IH5Group(self._record) if path[0] == "/" else self 

307 

308 ret: List[IH5Node] = [curr] 

309 if path == "/" or path == ".": # special case 

310 return ret 

311 

312 # access entity through child group sequence 

313 segs = path.strip("/").split("/") 

314 nxt_cidx = 0 

315 for i in range(len(segs)): 

316 seg, is_last_seg = segs[i], i == len(segs) - 1 

317 # find most recent container with that child 

318 nxt_cidx = curr._children().get(seg, -1) 

319 if nxt_cidx == -1: 

320 return ret # not found -> return current prefix 

321 curr = curr._get_child(seg, nxt_cidx) # proceed to child 

322 ret.append(curr) 

323 # catch invalid access, e.g. /foo is record, user accesses /foo/bar: 

324 if not is_last_seg and isinstance(curr, IH5Dataset): 

325 raise ValueError(f"Cannot access path inside a value: {curr._gpath}") 

326 # return path index sequence 

327 return ret 

328 

329 def _find(self, key: str) -> Optional[int]: 

330 """Return index of container holding that key (attribute or path), if any. 

331 

332 Args: 

333 key: nonempty string (attribute, or relative/absolute path) 

334 

335 Returns: 

336 Index >= 0 of most recent container patching that path if found, else None. 

337 """ 

338 if self._is_attrs: # access an attribute by key (always "relative") 

339 return self._children().get(key, None) 

340 # access a path (absolute or relative) 

341 nodes = self._node_seq(key) 

342 return nodes[-1]._cidx if nodes[-1]._gpath == self._abs_path(key) else None 

343 

344 # h5py-like interface 

345 

346 def get(self, key: str, default=None): 

347 try: 

348 return self[key] 

349 except KeyError as e: 

350 if str(e).find("not open") < 0: 

351 return default 

352 else: 

353 raise 

354 

355 def __getitem__(self, key: str): 

356 self._guard_open() 

357 self._guard_key(key) 

358 found_cidx = self._find(key) 

359 if found_cidx is None: 

360 raise KeyError(key) 

361 return self._get_child(key, found_cidx) 

362 

363 def _expect_real_item_idx(self, key: str) -> int: 

364 found_cidx = self._find(key) 

365 if found_cidx is None or _node_is_del_mark(self._get_child(key, found_cidx)): 

366 raise KeyError(f"Cannot delete '{key}', it does not exist!") 

367 return found_cidx 

368 

369 def __contains__(self, key: str): 

370 self._guard_key(key) 

371 return self._find(key) is not None 

372 

373 def __iter__(self): 

374 return iter(self._children().keys()) 

375 

376 def __len__(self): 

377 return len(self.keys()) 

378 

379 def keys(self): 

380 return self._children().keys() 

381 

382 def _dict(self): 

383 return {k: self._get_child(k, idx) for k, idx in self._children().items()} 

384 

385 def values(self): 

386 return self._dict().values() 

387 

388 def items(self): 

389 return self._dict().items() 

390 

391 

392class IH5Dataset(IH5Node): 

393 """`IH5Node` representing a `h5py.Dataset`, i.e. a leaf of the tree.""" 

394 

395 def __init__(self, files, gpath, creation_idx): 

396 super().__init__(files, gpath, creation_idx) 

397 

398 def copy_into_patch(self): 

399 """Copy the most recent value at this path into the current patch. 

400 

401 This is useful e.g. for editing inside a complex value, such as an array. 

402 """ 

403 self._guard_open() 

404 self._guard_read_only() 

405 if self._cidx == self._last_idx: 

406 raise ValueError("Cannot copy, this node is already from latest patch!") 

407 # copy value from older container to current patch 

408 self._files[-1][self._gpath] = self[()] 

409 

410 # h5py-like interface 

411 @property 

412 def name(self) -> str: 

413 return self._gpath 

414 

415 @property 

416 def file(self) -> IH5Record: 

417 return self._record 

418 

419 @property 

420 def parent(self) -> IH5Group: 

421 return self._record[self._parent_path()] 

422 

423 @property 

424 def attrs(self) -> IH5AttributeManager: 

425 self._guard_open() 

426 return IH5AttributeManager(self._record, self._gpath, self._cidx) 

427 

428 # this one is also needed to work with H5DatasetLike 

429 @property 

430 def ndim(self) -> int: 

431 return self._files[self._cidx][self._gpath].ndim # type: ignore 

432 

433 # for a dataset, instead of paths the numpy data is indexed. at this level 

434 # the patching mechanism ends, so it's just passing through to h5py 

435 

436 def __getitem__(self, key): 

437 # just pass through dataset indexing to underlying dataset 

438 self._guard_open() 

439 return self._files[self._cidx][self._gpath][key] # type: ignore 

440 

441 def __setitem__(self, key, val): 

442 self._guard_open() 

443 self._guard_read_only() 

444 if self._cidx != self._last_idx: 

445 raise ValueError(f"Cannot set '{key}', node is not from the latest patch!") 

446 # if we're in the latest patch, allow writing as usual (pass through) 

447 self._files[-1][self._gpath][key] = val # type: ignore 

448 

449 

450class IH5AttributeManager(IH5InnerNode): 

451 """`IH5Node` representing an `h5py.AttributeManager`.""" 

452 

453 def __init__(self, files, gpath, creation_idx): 

454 super().__init__(files, gpath, creation_idx, True) 

455 

456 def __setitem__(self, key: str, val): 

457 self._guard_open() 

458 self._guard_read_only() 

459 self._guard_key(key) 

460 self._guard_value(val) 

461 

462 # if path does not exist in current patch, just create "virtual node" 

463 if self._gpath not in self._files[-1]: 

464 self._files[-1].create_group(self._gpath) 

465 # deletion marker at `key` (if set) is overwritten automatically here 

466 # so no need to worry about removing it before assigning `val` 

467 self._files[-1][self._gpath].attrs[key] = val 

468 

469 def __delitem__(self, key: str): 

470 self._guard_open() 

471 self._guard_read_only() 

472 self._guard_key(key) 

473 # remove the entity if it is found in newest container, 

474 # mark the path as deleted if doing a patch and not working on base container 

475 if self._expect_real_item_idx(key) == self._last_idx: 

476 del self._files[-1][self._gpath].attrs[key] 

477 if len(self._files) > 1: # is a patch? 

478 if self._gpath not in self._files[-1]: # no node at path in latest? 

479 self._files[-1].create_group(self._gpath) # create "virtual" node 

480 self._files[-1][self._gpath].attrs[key] = DEL_VALUE # mark deleted 

481 

482 

483class IH5Group(IH5InnerNode): 

484 """`IH5Node` representing a `h5py.Group`.""" 

485 

486 def _require_node(self, name: str, node_type: Type[T]) -> Optional[T]: 

487 # helper for require_{group|dataset} 

488 grp = self.get(name) 

489 if isinstance(grp, node_type): 

490 return grp 

491 if grp is not None: 

492 msg = f"Incompatible object ({type(grp).__name__}) already exists" 

493 raise TypeError(msg) 

494 return None 

495 

496 def __init__(self, record, gpath: str = "/", creation_idx: Optional[int] = None): 

497 if gpath == "/": 

498 creation_idx = 0 

499 if creation_idx is None: 

500 raise ValueError("Need creation_idx for path != '/'!") 

501 super().__init__(record, gpath, creation_idx, False) 

502 

503 def _create_virtual(self, path: str) -> bool: 

504 nodes = self._node_seq(path) 

505 path = self._abs_path(path) 

506 if ( 

507 nodes[-1]._gpath == path 

508 and nodes[-1]._cidx == self._last_idx 

509 and not _node_is_del_mark(nodes[-1]) 

510 ): 

511 return False # something at that path in most recent container exists 

512 

513 # most recent entity is a deletion marker or not existing? 

514 if nodes[-1]._gpath != path or _node_is_del_mark(nodes[-1]): 

515 suf_segs = nodes[-1]._rel_path(path).split("/") 

516 # create "overwrite" group in most recent patch... 

517 self.create_group(f"{nodes[-1]._gpath}/{suf_segs[0]}") 

518 # ... and create (nested) virtual group node(s), if needed 

519 if len(suf_segs) > 1: 

520 self._files[-1].create_group(path) 

521 

522 return True 

523 

524 # h5py-like interface 

525 

526 def __setitem__(self, path: str, value): 

527 return self.create_dataset(path, data=value) 

528 

529 def __delitem__(self, key: str): 

530 self._guard_open() 

531 self._guard_read_only() 

532 self._guard_key(key) 

533 self._expect_real_item_idx(key) 

534 # remove the entity if it is found in newest container, 

535 # mark the path as deleted if doing a patch and not working on base container 

536 path = self._abs_path(key) 

537 if path in self._files[-1]: 

538 del self._files[-1][path] 

539 if len(self._files) > 1: # has patches? mark deleted (instead of real delete) 

540 self._files[-1][path] = DEL_VALUE 

541 

542 @property 

543 def name(self) -> str: 

544 return self._gpath 

545 

546 @property 

547 def file(self): # -> IH5Record 

548 return self._record 

549 

550 @property 

551 def parent(self) -> IH5Group: 

552 return self._record[self._parent_path()] 

553 

554 @property 

555 def attrs(self) -> IH5AttributeManager: 

556 self._guard_open() 

557 return IH5AttributeManager(self._record, self._gpath, self._cidx) 

558 

559 def create_group(self, name: str) -> IH5Group: 

560 self._guard_open() 

561 self._guard_read_only() 

562 

563 path = self._abs_path(name) 

564 nodes = self._node_seq(path) 

565 if not isinstance(nodes[-1], IH5Group): 

566 raise ValueError(f"Cannot create group, {nodes[-1]._gpath} is a dataset!") 

567 if nodes[-1]._gpath == path: 

568 raise ValueError("Cannot create group, it already exists!") 

569 

570 # remove "deleted" marker, if set at current path in current patch container 

571 if path in self._files[-1] and _node_is_del_mark(self._files[-1][path]): 

572 del self._files[-1][path] 

573 # create group (or fail if something else exists there already) 

574 self._files[-1].create_group(path) 

575 # if this is a patch: mark as non-virtual, i.e. "overwrite" with empty group 

576 # because the intent here is to "create", not update something. 

577 if len(self._files) > 1: 

578 self._files[-1][path].attrs[SUBST_KEY] = h5py.Empty(None) 

579 

580 return IH5Group(self._record, path, self._last_idx) 

581 

582 def create_dataset( 

583 self, path: str, shape=None, dtype=None, data=None, **kwargs 

584 ) -> IH5Dataset: 

585 self._guard_open() 

586 self._guard_read_only() 

587 self._guard_key(path) 

588 self._guard_value(data) 

589 

590 if unknown_kwargs := set(kwargs.keys()) - {"compression", "compression_opts"}: 

591 raise ValueError(f"Unkown kwargs: {unknown_kwargs}") 

592 

593 path = self._abs_path(path) 

594 fidx = self._find(path) 

595 if fidx is not None: 

596 prev_val = self._get_child(path, fidx) 

597 if isinstance(prev_val, (IH5Group, IH5Dataset)): 

598 raise ValueError("Path exists, in order to replace - delete first!") 

599 

600 if path in self._files[-1] and _node_is_del_mark( 

601 self._get_child_raw(path, self._last_idx) 

602 ): 

603 # remove deletion marker in latest patch, if set 

604 del self._files[-1][path] 

605 elif path not in self._files[-1]: 

606 # create path and overwrite-group in latest patch 

607 self._create_virtual(path) 

608 assert path in self._files[-1] 

609 del self._files[-1][path] 

610 

611 self._files[-1].create_dataset( # actually create it, finally 

612 path, shape=shape, dtype=dtype, data=data, **kwargs 

613 ) 

614 return IH5Dataset(self._record, path, self._last_idx) 

615 

616 def require_group(self, name: str) -> IH5Group: 

617 if (n := self._require_node(name, IH5Group)) is not None: 

618 return n # existing group 

619 return self.create_group(name) 

620 

621 def require_dataset(self, name: str, *args, **kwds) -> IH5Dataset: 

622 if (n := self._require_node(name, IH5Dataset)) is not None: 

623 # TODO: check dimensions etc, copy into patch if it fits 

624 return n 

625 return self.create_dataset(name, *args, **kwds) 

626 

627 def copy(self, source: CopySource, dest: CopyDest, **kwargs): 

628 src_node = self[source] if isinstance(source, str) else source 

629 name: str = kwargs.pop("name", src_node.name.split("/")[-1]) 

630 dst_name: str 

631 if isinstance(dest, str): 

632 # if dest is a path, ignore inferred/passed name 

633 segs = self._abs_path(dest).split("/") 

634 dst_group = self.require_group("/".join(segs[:-1]) or "/") 

635 dst_name = segs[-1] 

636 else: 

637 # given dest is a group node, use inferred/passed name 

638 

639 dst_group = dest if dest.name != "/" else dest["/"] # * 

640 # * ugly workaround for treating files as groups in the copy method 

641 

642 dst_name = name 

643 return h5_copy_from_to(src_node, cast(Any, dst_group), dst_name, **kwargs) 

644 

645 def move(self, source: str, dest: str): 

646 self.copy(source, dest) 

647 del self[source] 

648 

649 def visititems(self, func: Callable[[str, object], Optional[Any]]) -> Any: 

650 self._guard_open() 

651 stack = list(reversed(self._get_children())) 

652 while stack: 

653 curr = stack.pop() 

654 val = func(self._rel_path(curr._gpath), curr) 

655 if val is not None: 

656 return val 

657 if isinstance(curr, IH5Group): 

658 stack += reversed(curr._get_children()) 

659 

660 def visit(self, func: Callable[[str], Optional[Any]]) -> Any: 

661 return self.visititems(lambda x, _: func(x)) 

662 

663 

664CopySource = Union[str, IH5Group, IH5Dataset, h5py.Group, h5py.Dataset] 

665CopyDest = Union[str, IH5Group, h5py.Group] 

666 

667 

668# ---- 

669# Helpers for IH5 / H5 interop (its all h5py at the bottom anyway, so its easy) 

670 

671 

672class H5Type(str, Enum): 

673 """Type of an entity in a HDF5-like container. 

674 

675 We list only those we care about, ignoring various 

676 link types etc. 

677 

678 This will be used in wrappers around HDF5-like objects 

679 instead of using isinstance/subclass checks to implement 

680 duck-typing based decorator functionality that can 

681 work with (at least) raw HDF5, IH5 and IH5+Manifest. 

682 """ 

683 

684 group = "group" # possibly nested, dict-like 

685 dataset = "dataset" # = wrapped, indexable data 

686 attribute_set = "attribute-set" # = not further nested, dict-like 

687 attribute = "attribute" # = unwrapped data 

688 

689 def __repr__(self) -> str: 

690 return f"{type(self).__name__}.{self.value}" 

691 

692 

693def h5_copy_from_to( 

694 source_node: Union[H5DatasetLike, H5GroupLike], 

695 target_group: H5GroupLike, 

696 target_path: str, 

697 **kwargs, 

698): 

699 """Copy a dataset or group from one container to a fresh location. 

700 

701 This works also between HDF5 and IH5. 

702 

703 Source node must be group or dataset object. 

704 Target node must be an existing group object. 

705 Target path must be fresh path relative to target node. 

706 """ 

707 without_attrs: bool = kwargs.pop("without_attrs", False) 

708 shallow: bool = kwargs.pop("shallow", False) 

709 for arg in ["expand_soft", "expand_external", "expand_refs"]: 

710 if not kwargs.pop(arg, True): 

711 raise ValueError("IH5 does not support keeping references!") 

712 if kwargs: 

713 raise ValueError(f"Unknown keyword arguments: {kwargs}") 

714 

715 if not target_path or target_path[0] == "/": 

716 raise ValueError("Target path must be non-empty and relative!") 

717 if target_path in target_group: 

718 raise ValueError(f"Target path {target_path} already exists in target group!") 

719 

720 def copy_attrs(src_node, trg_node): 

721 if not without_attrs: 

722 trg_atrs = trg_node.attrs 

723 for k, v in src_node.attrs.items(): 

724 trg_atrs[k] = v 

725 

726 if isinstance(source_node, H5DatasetLike): 

727 node = target_group.create_dataset(target_path, data=source_node[()]) 

728 copy_attrs(source_node, node) # copy dataset attributes 

729 else: 

730 trg_root = target_group.create_group(target_path) 

731 copy_attrs(source_node, trg_root) # copy source node attributes 

732 

733 def copy_children(name, src_child): 

734 # name is relative to source root -> can use it 

735 if isinstance(src_child, H5DatasetLike): 

736 trg_root[name] = src_child[()] 

737 else: # must be grouplike 

738 trg_root.create_group(name) 

739 copy_attrs(src_child, trg_root[name]) 

740 

741 if shallow: # only immediate children 

742 for name, src_child in source_node.items(): 

743 copy_children(name, src_child) 

744 else: # recursive copy 

745 source_node.visititems(copy_children)