Coverage for src/metador_core/ih5/overlay.py: 97%

398 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-11 11:01 +0000

1"""Overlay wrappers to access a virtual record consisting of a base container + patches. 

2 

3The wrappers take care of dispatching requests to records, 

4groups and attributes to the correct path. 

5""" 

6from __future__ import annotations 

7 

8import re 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 Dict, 

16 List, 

17 Optional, 

18 Type, 

19 TypeVar, 

20 Union, 

21 cast, 

22) 

23 

24import h5py 

25import numpy as np 

26 

27from ..container.protocols import H5DatasetLike 

28 

29if TYPE_CHECKING: 

30 from ..container.protocols import H5GroupLike 

31 from .record import IH5Record 

32else: 

33 IH5Record = Any 

34 

35 

36# dataset value marking a deleted group, dataset or attribute 

37DEL_VALUE = np.void(b"\x7f") # ASCII DELETE 

38 

39T = TypeVar("T") 

40 

41 

42def _is_del_mark(val) -> bool: 

43 return isinstance(val, np.void) and val.tobytes() == DEL_VALUE.tobytes() 

44 

45 

46def _node_is_del_mark(node) -> bool: 

47 """Return whether node is marking a deleted group/dataset/attribute value.""" 

48 val = node[()] if isinstance(node, h5py.Dataset) else node 

49 return _is_del_mark(val) 

50 

51 

52# attribute key marking group substitution (instead of pass-through default for groups) 

53# attribute value does not matter, but should be np.Empty(None) 

54# if present, all children and attributes are interpreted as normal nodes, not as a patch 

55SUBST_KEY = "\x1a" # ASCII SUBSTITUTE 

56 

57 

58def _node_is_virtual(node) -> bool: 

59 """Virtual node (i.e. transparent and only carrier for child nodes and attributes).""" 

60 return isinstance(node, h5py.Group) and SUBST_KEY not in node.attrs 

61 

62 

63@dataclass(frozen=True) 

64class IH5Node: 

65 """An overlay node wraps a group, dataset or attribute manager. 

66 

67 It takes care of finding the correct container to look for the data 

68 and helps with patching data in a new patch container. 

69 

70 It essentially lifts the interface of h5py from a single file to an IH5 record 

71 that may consist of a base container file and a number of patch containers. 

72 """ 

73 

74 _record: IH5Record 

75 """Record this node belongs to (needed to access the actual data).""" 

76 

77 _gpath: str 

78 """Path in record that this node represents (absolute wrt. root of record).""" 

79 

80 _cidx: int 

81 """Left boundary index for lookups in order of loaded containers, i.e. 

82 this node will not consider containers with smaller index than that. 

83 """ 

84 

85 def __post_init__(self): 

86 """Instantiate an overlay node.""" 

87 if not self._gpath or self._gpath[0] != "/": 

88 raise ValueError("Path must be absolute!") 

89 if self._cidx < 0: 

90 raise ValueError("Creation index must be non-negative!") 

91 

92 @property 

93 def _files(self) -> List[h5py.File]: 

94 return self._record.__files__ 

95 

96 def __hash__(self): 

97 """Hash an overlay node. 

98 

99 Two nodes are equivalent if they are linked to the same 

100 open record and address the same entity. 

101 """ 

102 return hash((id(self._record), self._gpath, self._cidx)) 

103 

104 def __bool__(self) -> bool: 

105 return bool(self._files) and all(map(bool, self._files)) 

106 

107 @property 

108 def _last_idx(self): 

109 """Index of the latest container.""" 

110 return len(self._files) - 1 

111 

112 @property 

113 def _is_read_only(self) -> bool: 

114 """Return true if the newest container is read-only and nothing can be written.""" 

115 return not self._record._has_writable 

116 

117 def _guard_open(self): 

118 """Check that the record is open (if it was closed, the files are gone).""" 

119 if not self: 

120 raise ValueError("Record is not open or accessible!") 

121 

122 def _guard_read_only(self): 

123 if self._is_read_only: 

124 raise ValueError("Create a patch in order to change the container!") 

125 

126 def _guard_value(self, data): 

127 if _is_del_mark(data): 

128 raise ValueError(f"Value '{data}' is forbidden, cannot assign!") 

129 if isinstance(data, IH5Node): 

130 raise ValueError("Hard links are not supported, cannot assign!") 

131 if isinstance(data, h5py.SoftLink) or isinstance(data, h5py.ExternalLink): 

132 raise ValueError("SymLink and ExternalLink not supported, cannot assign!") 

133 

134 @classmethod 

135 def _latest_idx(cls, files, path: str) -> Optional[int]: 

136 """Return index of newest file where the group/dataset was overwritten/created. 

137 

138 Returns None if not found or most recent value is a deletion mark. 

139 """ 

140 idx = None 

141 for i in reversed(range(len(files))): 

142 if _node_is_del_mark(files[i][path]): 

143 return None 

144 elif _node_is_virtual(files[i][path]): 

145 idx = i 

146 else: 

147 return i # some patch overrides the group 

148 return idx 

149 

150 # path transformations 

151 

152 def _parent_path(self) -> str: 

153 """Return path of the parent node (the root is its own parent).""" 

154 if self._gpath == "/": 

155 return "/" 

156 segs = self._gpath.split("/")[:-1] 

157 return "/" if segs == [""] else "/".join(segs) 

158 

159 def _rel_path(self, path: str) -> str: 

160 """Return relative path based on node location, if passed path is absolute. 

161 

162 If relative, returns the path back unchanged. 

163 """ 

164 if path[0] != "/": 

165 return path 

166 if path.find(self._gpath) != 0: 

167 raise RuntimeError("Invalid usage, cannot strip non-matching prefix!") 

168 start_idx = len(self._gpath) + int(self._gpath != "/") 

169 return path[start_idx:] 

170 

171 def _abs_path(self, path: str) -> str: 

172 """Return absolute path based on node location, if given path is relative. 

173 

174 If absolute, returns the path back unchanged. 

175 """ 

176 pref = self._gpath if self._gpath != "/" else "" 

177 return path if path and path[0] == "/" else f"{pref}/{path}" 

178 

179 def _inspect_path(self, path): # pragma: no cover 

180 """Print the path node of all containers where the path is contained in.""" 

181 print(f"Path {path}:") 

182 for j in range(len(self._files)): 

183 if path in self._files[j]: 

184 node = self._files[j][path] 

185 print(f" idx={j}: {type(node).__name__}") 

186 if isinstance(node, h5py.Dataset): 

187 print(" ", node[()]) 

188 

189 

190class IH5InnerNode(IH5Node): 

191 """Common functionality for Group and AttributeManager. 

192 

193 Will grant either access to child records/subgroups, 

194 or to the attributes attached to the group/dataset at a path in a record. 

195 """ 

196 

197 @property 

198 def _is_attrs(self) -> bool: 

199 return self.__is_attrs__ 

200 

201 def __init__( 

202 self, 

203 record: IH5Record, 

204 gpath: str, 

205 creation_idx: int, 

206 attrs: bool = False, 

207 ): 

208 """See `IH5Node` constructor. 

209 

210 This variant represents an "overlay container", of which there are two types - 

211 a group (h5py.Group) and a set of attributes (h5py.AttributeManager). 

212 

213 This class takes care of both (in order to avoid lots of code duplication), 

214 distinguishing them through the additional `attrs` flag. 

215 """ 

216 super().__init__(record, gpath, creation_idx) 

217 # if attrs set, represents AttributeManager, otherwise its a group 

218 self.__is_attrs__: bool = attrs 

219 

220 def _guard_key(self, key: str): 

221 """Check a key used with bracket accessor notation. 

222 

223 (e.g. used for `__getitem__, __setitem__, __delitem__`) 

224 """ 

225 if key == "": 

226 raise ValueError("Invalid empty path!") 

227 if key.find("@") >= 0: # used as attribute separator in the skeleton! TODO 

228 raise ValueError(f"Invalid symbol '@' in key: '{key}'!") 

229 if re.match(r"^[!-~]+$", key) is None: 

230 raise ValueError("Invalid key: Only printable ASCII is allowed!") 

231 if self._is_attrs and (key.find("/") >= 0 or key == SUBST_KEY): 

232 raise ValueError(f"Invalid attribute key: '{key}'!") 

233 

234 def _get_child_raw(self, key: str, cidx: int) -> Any: 

235 """Return given child (dataset, group, attribute) from given container.""" 

236 if self._is_attrs: 

237 return self._files[cidx][self._gpath].attrs[key] 

238 else: 

239 return self._files[cidx][self._abs_path(key)] 

240 

241 def _get_child(self, key: str, cidx: int) -> Any: 

242 """Like _get_child_raw, but wraps the result with an overlay class if needed.""" 

243 val = self._get_child_raw(key, cidx) 

244 path = self._abs_path(key) 

245 if isinstance(val, h5py.Group): 

246 return IH5Group(self._record, path, cidx) 

247 elif isinstance(val, h5py.Dataset): 

248 return IH5Dataset(self._record, path, cidx) 

249 else: 

250 return val 

251 

252 def _children(self) -> Dict[str, int]: 

253 """Return dict mapping from a child name to the most recent overriding patch idx. 

254 

255 For datasets, dereferencing the child path in that container will give the data. 

256 For groups, the returned number is to be treated as the lower bound, i.e. 

257 the child creation_idx to recursively get the descendents. 

258 """ 

259 self._guard_open() 

260 

261 children: Dict[str, int] = {} 

262 is_virtual: Dict[str, bool] = {} 

263 for i in reversed(range(self._cidx, len(self._files))): 

264 if self._gpath not in self._files[i]: 

265 continue 

266 

267 obj = self._files[i][self._gpath] 

268 if self._is_attrs: 

269 obj = obj.attrs 

270 assert isinstance(obj, (h5py.Group, h5py.AttributeManager)) 

271 

272 # keep most recent version of child node / attribute 

273 for k in obj.keys(): 

274 if k not in children: 

275 is_virtual[k] = _node_is_virtual(self._get_child_raw(k, i)) 

276 children[k] = i 

277 elif is_virtual[k]: # .. and k in children! 

278 # decrease lower bound 

279 children[k] = min(children[k], i) 

280 

281 # return resulting child nodes / attributes (without the deleted ones) 

282 # in alphabetical order, 

283 # in case of attributes, also excludes special SUBST marker attribute 

284 return { 

285 k: idx 

286 for k, idx in sorted(children.items(), key=lambda x: x[0]) 

287 if (not self._is_attrs or k != SUBST_KEY) 

288 and not _node_is_del_mark(self._get_child_raw(k, idx)) 

289 } 

290 

291 def _get_children(self) -> List[Any]: 

292 """Get alphabetically ordered list of child nodes.""" 

293 return [ 

294 self._get_child(self._abs_path(k), idx) 

295 for k, idx in self._children().items() 

296 ] 

297 

298 def _node_seq(self, path: str) -> List[IH5Node]: 

299 """Return node sequence (one node per path prefix) to given path. 

300 

301 Returns: 

302 Sequence starting with the current node (if path is relative) 

303 or the root node (if absolute) followed by all successive 

304 children along the requested path that exist. 

305 """ 

306 curr: IH5InnerNode = IH5Group(self._record) if path[0] == "/" else self 

307 

308 ret: List[IH5Node] = [curr] 

309 if path == "/" or path == ".": # special case 

310 return ret 

311 

312 # access entity through child group sequence 

313 segs = path.strip("/").split("/") 

314 nxt_cidx = 0 

315 for i in range(len(segs)): 

316 seg, is_last_seg = segs[i], i == len(segs) - 1 

317 # find most recent container with that child 

318 nxt_cidx = curr._children().get(seg, -1) 

319 if nxt_cidx == -1: 

320 return ret # not found -> return current prefix 

321 curr = curr._get_child(seg, nxt_cidx) # proceed to child 

322 ret.append(curr) 

323 # catch invalid access, e.g. /foo is record, user accesses /foo/bar: 

324 if not is_last_seg and isinstance(curr, IH5Dataset): 

325 raise ValueError(f"Cannot access path inside a value: {curr._gpath}") 

326 # return path index sequence 

327 return ret 

328 

329 def _find(self, key: str) -> Optional[int]: 

330 """Return index of container holding that key (attribute or path), if any. 

331 

332 Args: 

333 key: nonempty string (attribute, or relative/absolute path) 

334 

335 Returns: 

336 Index >= 0 of most recent container patching that path if found, else None. 

337 """ 

338 if self._is_attrs: # access an attribute by key (always "relative") 

339 return self._children().get(key, None) 

340 # access a path (absolute or relative) 

341 nodes = self._node_seq(key) 

342 return nodes[-1]._cidx if nodes[-1]._gpath == self._abs_path(key) else None 

343 

344 # h5py-like interface 

345 

346 def get(self, key: str, default=None): 

347 try: 

348 return self[key] 

349 except KeyError: 

350 return default 

351 

352 def __getitem__(self, key: str): 

353 self._guard_open() 

354 self._guard_key(key) 

355 found_cidx = self._find(key) 

356 if found_cidx is None: 

357 raise KeyError(key) 

358 return self._get_child(key, found_cidx) 

359 

360 def _expect_real_item_idx(self, key: str) -> int: 

361 found_cidx = self._find(key) 

362 if found_cidx is None or _node_is_del_mark(self._get_child(key, found_cidx)): 

363 raise KeyError(f"Cannot delete '{key}', it does not exist!") 

364 return found_cidx 

365 

366 def __contains__(self, key: str): 

367 self._guard_key(key) 

368 return self._find(key) is not None 

369 

370 def __iter__(self): 

371 return iter(self._children().keys()) 

372 

373 def __len__(self): 

374 return len(self.keys()) 

375 

376 def keys(self): 

377 return self._children().keys() 

378 

379 def _dict(self): 

380 return {k: self._get_child(k, idx) for k, idx in self._children().items()} 

381 

382 def values(self): 

383 return self._dict().values() 

384 

385 def items(self): 

386 return self._dict().items() 

387 

388 

389class IH5Dataset(IH5Node): 

390 """`IH5Node` representing a `h5py.Dataset`, i.e. a leaf of the tree.""" 

391 

392 def __init__(self, files, gpath, creation_idx): 

393 super().__init__(files, gpath, creation_idx) 

394 

395 def copy_into_patch(self): 

396 """Copy the most recent value at this path into the current patch. 

397 

398 This is useful e.g. for editing inside a complex value, such as an array. 

399 """ 

400 self._guard_open() 

401 self._guard_read_only() 

402 if self._cidx == self._last_idx: 

403 raise ValueError("Cannot copy, this node is already from latest patch!") 

404 # copy value from older container to current patch 

405 self._files[-1][self._gpath] = self[()] 

406 

407 # h5py-like interface 

408 @property 

409 def name(self) -> str: 

410 return self._gpath 

411 

412 @property 

413 def file(self) -> IH5Record: 

414 return self._record 

415 

416 @property 

417 def parent(self) -> IH5Group: 

418 return self._record[self._parent_path()] 

419 

420 @property 

421 def attrs(self) -> IH5AttributeManager: 

422 self._guard_open() 

423 return IH5AttributeManager(self._record, self._gpath, self._cidx) 

424 

425 # this one is also needed to work with H5DatasetLike 

426 @property 

427 def ndim(self) -> int: 

428 return self._files[self._cidx][self._gpath].ndim # type: ignore 

429 

430 # for a dataset, instead of paths the numpy data is indexed. at this level 

431 # the patching mechanism ends, so it's just passing through to h5py 

432 

433 def __getitem__(self, key): 

434 # just pass through dataset indexing to underlying dataset 

435 self._guard_open() 

436 return self._files[self._cidx][self._gpath][key] # type: ignore 

437 

438 def __setitem__(self, key, val): 

439 self._guard_open() 

440 self._guard_read_only() 

441 if self._cidx != self._last_idx: 

442 raise ValueError(f"Cannot set '{key}', node is not from the latest patch!") 

443 # if we're in the latest patch, allow writing as usual (pass through) 

444 self._files[-1][self._gpath][key] = val # type: ignore 

445 

446 

447class IH5AttributeManager(IH5InnerNode): 

448 """`IH5Node` representing an `h5py.AttributeManager`.""" 

449 

450 def __init__(self, files, gpath, creation_idx): 

451 super().__init__(files, gpath, creation_idx, True) 

452 

453 def __setitem__(self, key: str, val): 

454 self._guard_open() 

455 self._guard_read_only() 

456 self._guard_key(key) 

457 self._guard_value(val) 

458 

459 # if path does not exist in current patch, just create "virtual node" 

460 if self._gpath not in self._files[-1]: 

461 self._files[-1].create_group(self._gpath) 

462 # deletion marker at `key` (if set) is overwritten automatically here 

463 # so no need to worry about removing it before assigning `val` 

464 self._files[-1][self._gpath].attrs[key] = val 

465 

466 def __delitem__(self, key: str): 

467 self._guard_open() 

468 self._guard_read_only() 

469 self._guard_key(key) 

470 # remove the entity if it is found in newest container, 

471 # mark the path as deleted if doing a patch and not working on base container 

472 if self._expect_real_item_idx(key) == self._last_idx: 

473 del self._files[-1][self._gpath].attrs[key] 

474 if len(self._files) > 1: # is a patch? 

475 if self._gpath not in self._files[-1]: # no node at path in latest? 

476 self._files[-1].create_group(self._gpath) # create "virtual" node 

477 self._files[-1][self._gpath].attrs[key] = DEL_VALUE # mark deleted 

478 

479 

480class IH5Group(IH5InnerNode): 

481 """`IH5Node` representing a `h5py.Group`.""" 

482 

483 def _require_node(self, name: str, node_type: Type[T]) -> Optional[T]: 

484 # helper for require_{group|dataset} 

485 grp = self.get(name) 

486 if isinstance(grp, node_type): 

487 return grp 

488 if grp is not None: 

489 msg = f"Incompatible object ({type(grp).__name__}) already exists" 

490 raise TypeError(msg) 

491 return None 

492 

493 def __init__(self, record, gpath: str = "/", creation_idx: Optional[int] = None): 

494 if gpath == "/": 

495 creation_idx = 0 

496 if creation_idx is None: 

497 raise ValueError("Need creation_idx for path != '/'!") 

498 super().__init__(record, gpath, creation_idx, False) 

499 

500 def _create_virtual(self, path: str) -> bool: 

501 nodes = self._node_seq(path) 

502 path = self._abs_path(path) 

503 if ( 

504 nodes[-1]._gpath == path 

505 and nodes[-1]._cidx == self._last_idx 

506 and not _node_is_del_mark(nodes[-1]) 

507 ): 

508 return False # something at that path in most recent container exists 

509 

510 # most recent entity is a deletion marker or not existing? 

511 if nodes[-1]._gpath != path or _node_is_del_mark(nodes[-1]): 

512 suf_segs = nodes[-1]._rel_path(path).split("/") 

513 # create "overwrite" group in most recent patch... 

514 self.create_group(f"{nodes[-1]._gpath}/{suf_segs[0]}") 

515 # ... and create (nested) virtual group node(s), if needed 

516 if len(suf_segs) > 1: 

517 self._files[-1].create_group(path) 

518 

519 return True 

520 

521 # h5py-like interface 

522 

523 def __setitem__(self, path: str, value): 

524 return self.create_dataset(path, data=value) 

525 

526 def __delitem__(self, key: str): 

527 self._guard_open() 

528 self._guard_read_only() 

529 self._guard_key(key) 

530 self._expect_real_item_idx(key) 

531 # remove the entity if it is found in newest container, 

532 # mark the path as deleted if doing a patch and not working on base container 

533 path = self._abs_path(key) 

534 if path in self._files[-1]: 

535 del self._files[-1][path] 

536 if len(self._files) > 1: # has patches? mark deleted (instead of real delete) 

537 self._files[-1][path] = DEL_VALUE 

538 

539 @property 

540 def name(self) -> str: 

541 return self._gpath 

542 

543 @property 

544 def file(self): # -> IH5Record 

545 return self._record 

546 

547 @property 

548 def parent(self) -> IH5Group: 

549 return self._record[self._parent_path()] 

550 

551 @property 

552 def attrs(self) -> IH5AttributeManager: 

553 self._guard_open() 

554 return IH5AttributeManager(self._record, self._gpath, self._cidx) 

555 

556 def create_group(self, name: str) -> IH5Group: 

557 self._guard_open() 

558 self._guard_read_only() 

559 

560 path = self._abs_path(name) 

561 nodes = self._node_seq(path) 

562 if not isinstance(nodes[-1], IH5Group): 

563 raise ValueError(f"Cannot create group, {nodes[-1]._gpath} is a dataset!") 

564 if nodes[-1]._gpath == path: 

565 raise ValueError("Cannot create group, it already exists!") 

566 

567 # remove "deleted" marker, if set at current path in current patch container 

568 if path in self._files[-1] and _node_is_del_mark(self._files[-1][path]): 

569 del self._files[-1][path] 

570 # create group (or fail if something else exists there already) 

571 self._files[-1].create_group(path) 

572 # if this is a patch: mark as non-virtual, i.e. "overwrite" with empty group 

573 # because the intent here is to "create", not update something. 

574 if len(self._files) > 1: 

575 self._files[-1][path].attrs[SUBST_KEY] = h5py.Empty(None) 

576 

577 return IH5Group(self._record, path, self._last_idx) 

578 

579 def create_dataset( 

580 self, path: str, shape=None, dtype=None, data=None, **kwargs 

581 ) -> IH5Dataset: 

582 self._guard_open() 

583 self._guard_read_only() 

584 self._guard_key(path) 

585 self._guard_value(data) 

586 

587 if unknown_kwargs := set(kwargs.keys()) - {"compression", "compression_opts"}: 

588 raise ValueError(f"Unkown kwargs: {unknown_kwargs}") 

589 

590 path = self._abs_path(path) 

591 fidx = self._find(path) 

592 if fidx is not None: 

593 prev_val = self._get_child(path, fidx) 

594 if isinstance(prev_val, (IH5Group, IH5Dataset)): 

595 raise ValueError("Path exists, in order to replace - delete first!") 

596 

597 if path in self._files[-1] and _node_is_del_mark( 

598 self._get_child_raw(path, self._last_idx) 

599 ): 

600 # remove deletion marker in latest patch, if set 

601 del self._files[-1][path] 

602 elif path not in self._files[-1]: 

603 # create path and overwrite-group in latest patch 

604 self._create_virtual(path) 

605 assert path in self._files[-1] 

606 del self._files[-1][path] 

607 

608 self._files[-1].create_dataset( # actually create it, finally 

609 path, shape=shape, dtype=dtype, data=data, **kwargs 

610 ) 

611 return IH5Dataset(self._record, path, self._last_idx) 

612 

613 def require_group(self, name: str) -> IH5Group: 

614 if (n := self._require_node(name, IH5Group)) is not None: 

615 return n # existing group 

616 return self.create_group(name) 

617 

618 def require_dataset(self, name: str, *args, **kwds) -> IH5Dataset: 

619 if (n := self._require_node(name, IH5Dataset)) is not None: 

620 # TODO: check dimensions etc, copy into patch if it fits 

621 return n 

622 return self.create_dataset(name, *args, **kwds) 

623 

624 def copy(self, source: CopySource, dest: CopyDest, **kwargs): 

625 src_node = self[source] if isinstance(source, str) else source 

626 name: str = kwargs.pop("name", src_node.name.split("/")[-1]) 

627 dst_name: str 

628 if isinstance(dest, str): 

629 # if dest is a path, ignore inferred/passed name 

630 segs = self._abs_path(dest).split("/") 

631 dst_group = self.require_group("/".join(segs[:-1]) or "/") 

632 dst_name = segs[-1] 

633 else: 

634 # given dest is a group node, use inferred/passed name 

635 

636 dst_group = dest if dest.name != "/" else dest["/"] # * 

637 # * ugly workaround for treating files as groups in the copy method 

638 

639 dst_name = name 

640 return h5_copy_from_to(src_node, cast(Any, dst_group), dst_name, **kwargs) 

641 

642 def move(self, source: str, dest: str): 

643 self.copy(source, dest) 

644 del self[source] 

645 

646 def visititems(self, func: Callable[[str, object], Optional[Any]]) -> Any: 

647 self._guard_open() 

648 stack = list(reversed(self._get_children())) 

649 while stack: 

650 curr = stack.pop() 

651 val = func(self._rel_path(curr._gpath), curr) 

652 if val is not None: 

653 return val 

654 if isinstance(curr, IH5Group): 

655 stack += reversed(curr._get_children()) 

656 

657 def visit(self, func: Callable[[str], Optional[Any]]) -> Any: 

658 return self.visititems(lambda x, _: func(x)) 

659 

660 

661CopySource = Union[str, IH5Group, IH5Dataset, h5py.Group, h5py.Dataset] 

662CopyDest = Union[str, IH5Group, h5py.Group] 

663 

664 

665# ---- 

666# Helpers for IH5 / H5 interop (its all h5py at the bottom anyway, so its easy) 

667 

668 

669class H5Type(str, Enum): 

670 """Type of an entity in a HDF5-like container. 

671 

672 We list only those we care about, ignoring various 

673 link types etc. 

674 

675 This will be used in wrappers around HDF5-like objects 

676 instead of using isinstance/subclass checks to implement 

677 duck-typing based decorator functionality that can 

678 work with (at least) raw HDF5, IH5 and IH5+Manifest. 

679 """ 

680 

681 group = "group" # possibly nested, dict-like 

682 dataset = "dataset" # = wrapped, indexable data 

683 attribute_set = "attribute-set" # = not further nested, dict-like 

684 attribute = "attribute" # = unwrapped data 

685 

686 def __repr__(self) -> str: 

687 return f"{type(self).__name__}.{self.value}" 

688 

689 

690def h5_copy_from_to( 

691 source_node: Union[H5DatasetLike, H5GroupLike], 

692 target_group: H5GroupLike, 

693 target_path: str, 

694 **kwargs, 

695): 

696 """Copy a dataset or group from one container to a fresh location. 

697 

698 This works also between HDF5 and IH5. 

699 

700 Source node must be group or dataset object. 

701 Target node must be an existing group object. 

702 Target path must be fresh path relative to target node. 

703 """ 

704 without_attrs: bool = kwargs.pop("without_attrs", False) 

705 shallow: bool = kwargs.pop("shallow", False) 

706 for arg in ["expand_soft", "expand_external", "expand_refs"]: 

707 if not kwargs.pop(arg, True): 

708 raise ValueError("IH5 does not support keeping references!") 

709 if kwargs: 

710 raise ValueError(f"Unknown keyword arguments: {kwargs}") 

711 

712 if not target_path or target_path[0] == "/": 

713 raise ValueError("Target path must be non-empty and relative!") 

714 if target_path in target_group: 

715 raise ValueError(f"Target path {target_path} already exists in target group!") 

716 

717 def copy_attrs(src_node, trg_node): 

718 if not without_attrs: 

719 trg_atrs = trg_node.attrs 

720 for k, v in src_node.attrs.items(): 

721 trg_atrs[k] = v 

722 

723 if isinstance(source_node, H5DatasetLike): 

724 node = target_group.create_dataset(target_path, data=source_node[()]) 

725 copy_attrs(source_node, node) # copy dataset attributes 

726 else: 

727 trg_root = target_group.create_group(target_path) 

728 copy_attrs(source_node, trg_root) # copy source node attributes 

729 

730 def copy_children(name, src_child): 

731 # name is relative to source root -> can use it 

732 if isinstance(src_child, H5DatasetLike): 

733 trg_root[name] = src_child[()] 

734 else: # must be grouplike 

735 trg_root.create_group(name) 

736 copy_attrs(src_child, trg_root[name]) 

737 

738 if shallow: # only immediate children 

739 for name, src_child in source_node.items(): 

740 copy_children(name, src_child) 

741 else: # recursive copy 

742 source_node.visititems(copy_children)