Coverage for src/metador_core/ih5/record.py: 93%
338 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
1"""Immutable HDF5 container records.
3A record consists of a base container and a number of patch containers.
4This allows a record to work in settings where files are immutable, but still
5provide a structured way of updating data stored inside.
7Both base containers and patches are HDF5 files that are linked together
8by some special attributes in the container root.
9`IH5Record` is a class that wraps such a set of files. It features
10* support for record creation and updating
11* automatic handling of the patch mechanism (i.e., creating/finding corresponding files)
12* transparent access to data in the record (possibly spanning multiple files)
13"""
14from __future__ import annotations
16import json
17import re
18from pathlib import Path
19from typing import Any, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union
20from uuid import UUID, uuid1
22import h5py
23from pydantic import BaseModel, Field, PrivateAttr
24from typing_extensions import Annotated, Final
26from ..schema.types import QualHashsumStr
27from ..util.hashsums import qualified_hashsum
28from ..util.types import OPEN_MODES, OpenMode
29from .overlay import IH5Group, h5_copy_from_to
31# the magic string we use to identify a valid container
32FORMAT_MAGIC_STR: Final[str] = "ih5_v01"
33"""Magic value at the beginning of the file to detect that an HDF5 file is valid IH5."""
35USER_BLOCK_SIZE: Final[int] = 1024
36"""Space to reserve at beginning of each HDF5 file in bytes.
37Must be a power of 2 and at least 512 (required by HDF5)."""
39T = TypeVar("T", bound="IH5Record")
42def hashsum_file(filename: Path, skip_bytes: int = 0) -> str:
43 """Compute hashsum of HDF5 file (ignoring the first `skip_bytes`)."""
44 with open(filename, "rb") as f:
45 f.seek(skip_bytes)
46 return qualified_hashsum(f)
49class IH5UserBlock(BaseModel):
50 """IH5 metadata object parser and writer for the HDF5 user block.
52 The user block stores technical administrative information linking together
53 multiple HDF5 files that form a base container + patch sequence.
55 The userblock begins with the magic format string, followed by a newline,
56 followed by a string encoding the length of the user block, followed by a newline,
57 followed by a serialized JSON object without newlines containing the metadata,
58 followed by a NUL byte.
59 """
61 _userblock_size: int = PrivateAttr(default=USER_BLOCK_SIZE)
62 """User block size claimed in the block itself (second line)."""
64 record_uuid: UUID
65 """UUID linking together multiple HDF5 files that form a (patched) record."""
67 patch_index: Annotated[int, Field(ge=0)]
68 """Index with the current revision number, i.e. the file is the n-th patch."""
70 patch_uuid: UUID
71 """UUID representing a certain state of the data in the record."""
73 prev_patch: Optional[UUID]
74 """UUID of the previous patch UUID (unless it is a base container, i.e. first one)."""
76 hdf5_hashsum: Optional[QualHashsumStr] = None
77 """Hashsum to verity integrity of the HDF5 data after the user block."""
79 ub_exts: Dict[str, Any]
80 """Any extra metadata to be stored in the user block, unvalidated in dicts.
82 Subclasses must ensure that desired extra metadata is stored and loaded correctly.
84 NOTE: In a merge of userblocks only newer extension section will be preserved!
85 """
87 @classmethod
88 def create(cls, prev: Optional[IH5UserBlock] = None) -> IH5UserBlock:
89 """Create a new user block for a base or patch container.
91 If `prev` is None, will return a new base container block.
92 Otherwise, will return a block linking back to the passed `prev` block.
93 """
94 ret = cls(
95 patch_uuid=uuid1(),
96 record_uuid=uuid1() if prev is None else prev.record_uuid,
97 patch_index=0 if prev is None else prev.patch_index + 1,
98 prev_patch=None if prev is None else prev.patch_uuid,
99 ub_exts={},
100 )
101 return ret
103 @classmethod
104 def _read_head_raw(cls, stream, ub_size: int) -> Optional[Tuple[int, str]]:
105 """Try reading user block.
107 Args:
108 stream: the open binary file stream
109 ub_size: number of bytes to read
111 Returns:
112 (user block size claimed in block, embedded data until first NUL byte)
113 or None, if block does not look right.
114 """
115 stream.seek(0)
116 probe = stream.read(ub_size)
117 dat = probe.decode("utf-8").split("\n")
118 if len(dat) != 3 or dat[0] != FORMAT_MAGIC_STR:
119 return None
120 return (int(dat[1]), dat[2][: dat[2].find("\x00")]) # read until first NUL byte
122 @classmethod
123 def load(cls, filename: Path) -> IH5UserBlock:
124 """Load a user block of the given HDF5 file."""
125 with open(filename, "rb") as f:
126 # try smallest valid UB size first
127 head = cls._read_head_raw(f, 512)
128 if head is None:
129 raise ValueError(f"{filename}: it doesn't look like a valid IH5 file!")
130 if head[0] > 512: # if stored user block size is bigger, re-read
131 head = cls._read_head_raw(f, head[0])
132 assert head is not None
133 ret = IH5UserBlock.parse_obj(json.loads(head[1]))
134 ret._userblock_size = head[0]
135 return ret
137 def save(self, filename: Union[Path, str]):
138 """Save this object in the user block of the given HDF5 file."""
139 filename = Path(filename)
140 dat_str = f"{FORMAT_MAGIC_STR}\n{self._userblock_size}\n{self.json()}"
141 data = dat_str.encode("utf-8")
142 assert len(data) < USER_BLOCK_SIZE
143 with open(filename, "r+b") as f:
144 # check that this HDF file actually has a user block
145 check = f.read(4)
146 if check == b"\x89HDF":
147 raise ValueError(f"{filename}: no user block reserved, can't write!")
148 f.seek(0)
149 f.write(data)
150 f.write(b"\x00") # mark end of the data
153class IH5Record(IH5Group):
154 """Class representing a record, which consists of a collection of immutable files.
156 One file is a base container (with no linked predecessor state),
157 the remaining files are a linear sequence of patch containers.
159 Runtime invariants to be upheld before/after each method call (after __init__):
161 * all files of an instance are open for reading (until `close()` is called)
162 * all files in `__files__` are in patch index order
163 * at most one file is open in writable mode (if any, it is the last one)
164 * modifications are possible only after `create_patch` was called
165 and until `commit_patch` or `discard_patch` was called, and at no other time
166 """
168 # Characters that may appear in a record name.
169 # (to be put into regex [..] symbol braces)
170 _ALLOWED_NAME_CHARS = r"A-Za-z0-9\-"
172 # filenames for a record named NAME are of the shape:
173 # NAME[<PATCH_INFIX>.*]?<FILE_EXT>
174 # NOTE: the first symbol of these must be one NOT in ALLOWED_NAME_CHARS!
175 # This constraint is needed for correctly filtering filenames
176 _PATCH_INFIX = ".p"
177 _FILE_EXT = ".ih5"
179 # core "wrapped" objects
180 __files__: List[h5py.File]
182 # attributes
183 _closed: bool # True after close()
184 _allow_patching: bool # false iff opened with "r"
185 _ublocks: Dict[Path, IH5UserBlock] # in-memory copy of HDF5 user blocks
187 def __new__(cls, *args, **kwargs):
188 ret = super().__new__(cls)
189 ret._allow_patching = True
190 ret.__files__ = []
191 return ret
193 def __eq__(self, o) -> bool:
194 if not isinstance(o, IH5Group):
195 return False
196 return self._record._files == o._record._files
198 @property
199 def _has_writable(self):
200 """Return True iff an uncommitted patch exists."""
201 if not self.__files__:
202 return False
203 f = self.__files__[-1]
204 return bool(f) and f.mode == "r+"
206 @classmethod
207 def _is_valid_record_name(cls, name: str) -> bool:
208 """Return whether a record name is valid."""
209 return re.match(f"^[{cls._ALLOWED_NAME_CHARS}]+$", name) is not None
211 @classmethod
212 def _base_filename(cls, record_path: Path) -> Path:
213 """Given a record path, return path to canonical base container name."""
214 return Path(f"{record_path}{cls._FILE_EXT}")
216 @classmethod
217 def _infer_name(cls, record_path: Path) -> str:
218 return record_path.name.split(cls._FILE_EXT)[0].split(cls._PATCH_INFIX)[0]
220 def _next_patch_filepath(self) -> Path:
221 """Compute filepath for the next patch based on the previous one."""
222 path = Path(self.__files__[0].filename)
223 parent = path.parent
224 patch_index = self._ublock(-1).patch_index + 1
225 res = f"{parent}/{self._infer_name(path)}{self._PATCH_INFIX}{patch_index}{self._FILE_EXT}"
226 return Path(res)
228 def _ublock(self, obj: Union[h5py.File, int]) -> IH5UserBlock:
229 """Return the parsed user block of a container file."""
230 f: h5py.File = obj if isinstance(obj, h5py.File) else self.__files__[obj]
231 return self._ublocks[Path(f.filename)]
233 def _set_ublock(self, obj: Union[h5py.File, int], ub: IH5UserBlock):
234 f: h5py.File = obj if isinstance(obj, h5py.File) else self.__files__[obj]
235 self._ublocks[Path(f.filename)] = ub
237 @classmethod
238 def _new_container(cls, path: Path, ub: IH5UserBlock) -> h5py.File:
239 """Initialize a fresh container file with reserved user block."""
240 # create if does not exist, fail if it does
241 f = h5py.File(path, mode="x", userblock_size=USER_BLOCK_SIZE)
242 # close to pre-fill userblock
243 f.close()
244 ub.save(path)
245 # reopen the container file
246 return h5py.File(path, "r+")
248 def _check_ublock(
249 self,
250 filename: Union[str, Path],
251 ub: IH5UserBlock,
252 prev: Optional[IH5UserBlock] = None,
253 check_hashsum: bool = True,
254 ):
255 """Check given container file.
257 If `prev` block is given, assumes that `ub` is from a patch container,
258 otherwise from base container.
259 """
260 filename = Path(filename)
261 # check presence+validity of record uuid (should be the same for all)
262 if ub.record_uuid != self.ih5_uuid:
263 msg = "'record_uuid' inconsistent! Mixed up records?"
264 raise ValueError(f"{filename}: {msg}")
266 # hash must match with HDF5 content (i.e. integrity check)
267 if check_hashsum and ub.hdf5_hashsum is None:
268 msg = "hdf5_checksum is missing!"
269 raise ValueError(f"{filename}: {msg}")
270 if ub.hdf5_hashsum is not None:
271 chksum = hashsum_file(filename, skip_bytes=USER_BLOCK_SIZE)
272 if ub.hdf5_hashsum != chksum:
273 msg = "file has been modified, stored and computed checksum are different!"
274 raise ValueError(f"{filename}: {msg}")
276 # check patch chain structure
277 if prev is not None:
278 if ub.patch_index <= prev.patch_index:
279 msg = "patch container must have greater index than predecessor!"
280 raise ValueError(f"{filename}: {msg}")
281 if ub.prev_patch is None:
282 msg = "patch must have an attribute 'prev_patch'!"
283 raise ValueError(f"{filename}: {msg}")
284 # claimed predecessor uuid must match with the predecessor by index
285 # (can compare as strings directly, as we checked those already)
286 if ub.prev_patch != prev.patch_uuid:
287 msg = f"patch for {ub.prev_patch}, but predecessor is {prev.patch_uuid}"
288 raise ValueError(f"{filename}: {msg}")
290 def _expect_open(self):
291 if self._closed:
292 raise ValueError("Record is not open!")
294 def _clear(self):
295 """Clear all contents of the record."""
296 for k in self.attrs.keys():
297 del self.attrs[k]
298 for k in self.keys():
299 del self[k]
301 def _is_empty(self) -> bool:
302 """Return whether this record currently contains any data."""
303 return not self.attrs.keys() and not self.keys()
305 @classmethod
306 def _create(cls: Type[T], record: Union[Path, str], truncate: bool = False) -> T:
307 """Create a new record consisting of a base container.
309 The base container is exposed as the `writable` container.
310 """
311 record = Path(record) # in case it was a str
312 if not cls._is_valid_record_name(record.name):
313 raise ValueError(f"Invalid record name: '{record.name}'")
314 path = cls._base_filename(record)
316 # if overwrite flag is set, check and remove old record if present
317 if truncate and path.is_file():
318 cls.delete_files(record)
320 # create new container
321 ret = cls.__new__(cls)
322 super().__init__(ret, ret)
323 ret._closed = False
325 ub = IH5UserBlock.create(prev=None)
326 ret._ublocks = {path: ub}
328 ret.__files__ = [cls._new_container(path, ub)]
329 return ret
331 @classmethod
332 def _open(cls: Type[T], paths: List[Path], **kwargs) -> T:
333 """Open a record consisting of a base container + possible set of patches.
335 Expects a set of full file paths forming a valid record.
336 Will throw an exception in case of a detected inconsistency.
338 Will open latest patch in writable mode if it lacks a hdf5 checksum.
339 """
340 if not paths:
341 raise ValueError("Cannot open empty list of containers!")
342 allow_baseless: bool = kwargs.pop("allow_baseless", False)
344 ret = cls.__new__(cls)
345 super().__init__(ret, ret)
346 ret._closed = False
348 ret._ublocks = {Path(path): IH5UserBlock.load(path) for path in paths}
349 # files, sorted by patch index order (important!)
350 # if something is wrong with the indices, this will throw an exception.
351 ret.__files__ = [h5py.File(path, "r") for path in paths]
352 ret.__files__.sort(key=lambda f: ret._ublock(f).patch_index)
353 # ----
354 has_patches: bool = len(ret.__files__) > 1
356 # check containers and relationship to each other:
358 # check first container (it could be a base container and it has no predecessor)
359 if not allow_baseless and ret._ublock(0).prev_patch is not None:
360 msg = "base container must not have attribute 'prev_patch'!"
361 raise ValueError(f"{ret.__files__[0].filename}: {msg}")
362 ret._check_ublock(ret.__files__[0].filename, ret._ublock(0), None, has_patches)
364 # check patches except last one (with checking the hashsum)
365 for i in range(1, len(ret.__files__) - 1):
366 filename = ret.__files__[i].filename
367 ret._check_ublock(filename, ret._ublock(i), ret._ublock(i - 1), True)
368 if has_patches: # check latest patch (without checking hashsum)
369 ret._check_ublock(
370 ret.__files__[-1].filename, ret._ublock(-1), ret._ublock(-2), False
371 )
373 # now check whether the last container (patch or base or whatever) has a checksum
374 if ret._ublock(-1).hdf5_hashsum is None:
375 if kwargs.pop("reopen_incomplete_patch", False):
376 # if opening in writable mode, allow to complete the patch
377 f = ret.__files__[-1]
378 path = ret.__files__[-1].filename
379 f.close()
380 ret.__files__[-1] = h5py.File(Path(path), "r+")
382 # additional sanity check: container uuids must be all distinct
383 cn_uuids = {ret._ublock(f).patch_uuid for f in ret.__files__}
384 if len(cn_uuids) != len(ret.__files__):
385 raise ValueError("Some patch_uuid is not unique, invalid file set!")
386 # all looks good
387 return ret
389 # ---- public attributes and interface ----
391 @property
392 def ih5_uuid(self) -> UUID:
393 """Return the common record UUID of the set of containers."""
394 return self._ublock(0).record_uuid
396 @property
397 def ih5_files(self) -> List[Path]:
398 """List of container filenames this record consists of."""
399 return [Path(f.filename) for f in self.__files__]
401 @property
402 def ih5_meta(self) -> List[IH5UserBlock]:
403 """Return user block metadata, in container patch order."""
404 return [self._ublock(i).copy() for i in range(len(self.__files__))]
406 @classmethod
407 def find_files(cls, record: Path) -> List[Path]:
408 """Return file names that look like they belong to the same record.
410 This operation is based on purely syntactic pattern matching on file names
411 that follow the default naming convention.
413 Given a path `/foo/bar`, will find all containers in directory
414 `/foo` whose name starts with `bar` followed by the correct file extension(s),
415 such as `/foo/bar.ih5` and `/foo/bar.p1.ih5`.
416 """
417 record = Path(record) # in case it was a str
418 if not cls._is_valid_record_name(record.name):
419 raise ValueError(f"Invalid record name: '{record.name}'")
421 globstr = f"{record.name}*{cls._FILE_EXT}" # rough wildcard pattern
422 # filter out possible false positives (i.e. foobar* matching foo* as well)
423 return [
424 p
425 for p in record.parent.glob(globstr)
426 if re.match(f"^{record.name}[^{cls._ALLOWED_NAME_CHARS}]", p.name)
427 ]
429 @classmethod
430 def list_records(cls, dir: Path) -> List[Path]:
431 """Return paths of records found in the given directory.
433 Will NOT recurse into subdirectories.
435 This operation is based on purely syntactic pattern matching on file names
436 that follow the default naming convention (i.e. just as `find_files`).
438 Returned paths can be used as-is for opening the (supposed) record.
439 """
440 dir = Path(dir) # in case it was a str
441 if not dir.is_dir():
442 raise ValueError(f"'{dir}' is not a directory")
444 ret = []
445 namepat = f"[{cls._ALLOWED_NAME_CHARS}]+(?=[^{cls._ALLOWED_NAME_CHARS}])"
446 for p in dir.glob(f"*{cls._FILE_EXT}"):
447 if m := re.match(namepat, p.name):
448 ret.append(m[0])
449 return list(map(lambda name: dir / name, set(ret)))
451 def __init__(
452 self, record: Union[str, Path, List[Path]], mode: OpenMode = "r", **kwargs
453 ):
454 """Open or create a record.
456 This method uses `find_files` to infer the correct set of files syntactically.
458 The open mode semantics are the same as for h5py.File.
460 If the mode is 'r', then creating, committing or discarding patches is disabled.
462 If the mode is 'a' or 'r+', then a new patch will be created in case the latest
463 patch has already been committed.
464 """
465 super().__init__(self)
467 if isinstance(record, list):
468 if mode[0] == "w" or mode == "x":
469 raise ValueError("Pass a prefix path for creating or overwriting!")
470 paths = record
471 else:
472 paths = None
473 path: Path = Path(record)
475 if mode not in OPEN_MODES:
476 raise ValueError(f"Unknown file open mode: {mode}")
478 if mode[0] == "w" or mode == "x":
479 # create new or overwrite to get new
480 ret = self._create(path, truncate=(mode == "w"))
481 self.__dict__.update(ret.__dict__)
482 return
484 if mode == "a" or mode[0] == "r":
485 if not paths: # user passed a path prefix -> find files
486 paths = self.find_files(path) # type: ignore
488 if not paths: # no files were found
489 if mode != "a": # r/r+ need existing containers
490 raise FileNotFoundError(f"No files found for record: {path}")
491 else: # 'a' means create new if not existing (will be writable)
492 ret = self._create(path, truncate=False)
493 self.__dict__.update(ret.__dict__)
494 return
496 # open existing (will be ro if everything is fine, writable if latest patch was uncommitted)
497 want_rw = mode != "r"
498 ret = self._open(paths, reopen_incomplete_patch=want_rw, **kwargs)
499 self.__dict__.update(ret.__dict__)
500 self._allow_patching = want_rw
502 if want_rw and not self._has_writable:
503 # latest patch was completed correctly -> make writable by creating new patch
504 self.create_patch()
506 @property
507 def mode(self) -> Literal["r", "r+"]:
508 return "r+" if self._allow_patching else "r"
510 def close(self, commit: bool = True) -> None:
511 """Close all files that belong to this record.
513 If there exists an uncommited patch, it will be committed
514 (unless `commit` is set to false).
516 After this, the object may not be used anymore.
517 """
518 if self._closed:
519 return # nothing to do
521 if self._has_writable and commit:
522 self.commit_patch()
523 for f in self.__files__:
524 f.close()
525 self.__files__ = []
526 self._closed = True
528 def _expect_not_ro(self):
529 if self.mode == "r":
530 raise ValueError("The container is opened as read-only!")
532 def create_patch(self) -> None:
533 """Create a new patch in order to update the record."""
534 self._expect_open()
535 self._expect_not_ro()
536 if self._has_writable:
537 raise ValueError("There already exists a writable container, commit first!")
539 path = self._next_patch_filepath()
540 ub = IH5UserBlock.create(prev=self._ublock(-1))
541 self.__files__.append(self._new_container(path, ub))
542 self._ublocks[path] = ub
544 def _delete_latest_container(self) -> None:
545 """Discard the current writable container (patch or base)."""
546 cfile = self.__files__.pop()
547 fn = cfile.filename
548 del self._ublocks[Path(fn)]
549 cfile.close()
550 Path(fn).unlink()
552 def discard_patch(self) -> None:
553 """Discard the current incomplete patch container."""
554 self._expect_open()
555 self._expect_not_ro()
556 if not self._has_writable:
557 raise ValueError("No patch to discard!")
558 if len(self.__files__) == 1:
559 raise ValueError("Cannot discard base container! Just delete the file!")
560 # reason: the base container provides record_uuid,
561 # destroying it makes this object inconsistent / breaks invariants
562 # so if this is done, it should not be used anymore.
563 return self._delete_latest_container()
565 def commit_patch(self, **kwargs) -> None:
566 """Complete the current writable container (base or patch) for the record.
568 Will perform checks on the new container and throw an exception on failure.
570 After committing the patch is completed and cannot be edited anymore, so
571 any further modifications must go into a new patch.
572 """
573 if kwargs:
574 raise ValueError(f"Unknown keyword arguments: {kwargs}")
576 self._expect_open()
577 self._expect_not_ro()
578 if not self._has_writable:
579 raise ValueError("No patch to commit!")
580 cfile = self.__files__[-1]
581 filepath = Path(cfile.filename)
582 cfile.close() # must close it now, as we will write outside of HDF5 next
584 # compute checksum, write user block
585 chksum = hashsum_file(filepath, skip_bytes=USER_BLOCK_SIZE)
586 self._ublocks[filepath].hdf5_hashsum = QualHashsumStr(chksum)
587 self._ublocks[filepath].save(filepath)
589 # reopen the container file now as read-only
590 self.__files__[-1] = h5py.File(filepath, "r")
592 def _fixes_after_merge(self, merged_file, ub):
593 """Run hook for subclasses into merge process.
595 The method is called after creating the merged container, but before
596 updating its user block on disk.
598 The passed userblock is a prepared userblock with updated HDF5 hashsum for the
599 merged container and adapted prev_patch field, as will it be written to the file.
600 Additional changes done to it in-place will be included.
602 The passed filename can be used to perform additional necessary actions.
603 """
605 def merge_files(self, target: Path) -> Path:
606 """Given a path with a record name, merge current record into new container.
608 Returns new resulting container.
609 """
610 self._expect_open()
611 if self._has_writable:
612 raise ValueError("Cannot merge, please commit or discard your changes!")
614 with type(self)(target, "x") as ds:
615 source_node = self["/"]
616 target_node = ds["/"]
617 for k, v in source_node.attrs.items(): # copy root attributes
618 target_node.attrs[k] = v
619 for name in source_node.keys(): # copy each entity (will recurse)
620 h5_copy_from_to(source_node[name], target_node, name)
622 cfile = ds.ih5_files[0] # store filename to override userblock afterwards
624 # compute new merged userblock
625 ub = self._ublock(-1).copy(update={"prev_patch": self._ublock(0).prev_patch})
626 # update hashsum with saved new merged hdf5 payload
627 chksum = hashsum_file(cfile, skip_bytes=USER_BLOCK_SIZE)
628 ub.hdf5_hashsum = QualHashsumStr(chksum)
630 self._fixes_after_merge(cfile, ub) # for subclass hooks
632 self._set_ublock(-1, ub)
633 ub.save(cfile)
634 return cfile
636 @classmethod
637 def delete_files(cls, record: Path):
638 """Irreversibly(!) delete all containers matching the record path.
640 This object is invalid after this operation.
641 """
642 for file in cls.find_files(record):
643 file.unlink()
645 def __repr__(self):
646 return f"<IH5 record (mode {self.mode}) {self.__files__}>"
648 # ---- context manager support (i.e. to use `with`) ----
650 def __enter__(self):
651 return self
653 def __exit__(self, ex_type, ex_value, ex_traceback):
654 # this will ensure that commit_patch() is called and the files are closed
655 self.close()