Coverage for src/metador_core/util/diff.py: 33%
147 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
1from __future__ import annotations
3import itertools
4from enum import Enum
5from pathlib import Path
6from typing import Any, Dict, Iterable, List, Optional, Union
8from pydantic import BaseModel
10from .hashsums import DirHashsums
13def dir_paths(base_dir: Path):
14 """Recursively list all paths in given directory, relative to itself."""
15 return map(lambda p: p.relative_to(base_dir), sorted(base_dir.rglob("*")))
18class DiffNode(BaseModel):
19 """Node representing a file, symlink or directory in a diff."""
21 class Status(str, Enum):
22 """Change that happened to a DiffNode."""
24 removed = "-"
25 modified = "~"
26 added = "+"
27 unchanged = "0"
29 class ObjType(str, Enum):
30 """Entities represented in a DiffNode."""
32 directory = "d"
33 file = "f"
34 symlink = "s"
36 path: Path
37 """Location represented by this node."""
39 prev: Union[None, str, Dict[str, Any]]
40 """Previous entity at this location."""
42 curr: Union[None, str, Dict[str, Any]]
43 """Current entity at this location."""
45 removed: Dict[Path, DiffNode] = {}
46 """Deleted files and subdirectories."""
48 modified: Dict[Path, DiffNode] = {}
49 """Modified or replaced files and subdirectories."""
51 added: Dict[Path, DiffNode] = {}
52 """New files and subdirectories."""
54 def _type(self, entity) -> Optional[DiffNode.ObjType]:
55 if isinstance(entity, dict):
56 return DiffNode.ObjType.directory
57 elif entity:
58 if entity.find("symlink:") == 0:
59 return DiffNode.ObjType.symlink
60 else:
61 return DiffNode.ObjType.file
62 return None
64 @property
65 def prev_type(self) -> Optional[DiffNode.ObjType]:
66 return self._type(self.prev)
68 @property
69 def curr_type(self) -> Optional[DiffNode.ObjType]:
70 return self._type(self.curr)
72 def children(self) -> Iterable[DiffNode]:
73 """Return immediate children in no specific order."""
74 return itertools.chain(
75 *(x.values() for x in [self.removed, self.modified, self.added])
76 )
78 def nodes(self) -> List[DiffNode]:
79 """Return list of all nodes in this diff.
81 The order will be recursively:
82 removed children, modified children, itself, then added children.
83 Within the same category the children are sorted in alphabetical order.
84 """
85 ret = []
86 buckets = [self.removed, self.modified, None, self.added]
87 for b in buckets:
88 if b is None:
89 ret.append(self)
90 else:
91 for v in sorted(b.values(), key=lambda x: x.path):
92 ret += v.nodes()
93 return ret
95 def status(self) -> DiffNode.Status:
96 """Check the given path (which is assumed to be relative to this diff node).
98 Returns whether the path was added, removed or modified, respectively.
99 """
100 if self.prev is None:
101 return DiffNode.Status.added
102 elif self.curr is None:
103 return DiffNode.Status.removed
104 else: # we can assume self.prev != self.curr
105 return DiffNode.Status.modified
107 @classmethod
108 def compare(
109 cls,
110 prev: Optional[DirHashsums],
111 curr: Optional[DirHashsums],
112 path: Path,
113 ) -> Optional[DiffNode]:
114 """Compare two nested file and directory hashsum dicts.
116 Returns None if no difference is found, otherwise a DiffNode tree
117 containing only the additions, removals and changes.
118 """
119 ret = cls(path=path, prev=prev, curr=curr)
120 if not isinstance(prev, dict) and not isinstance(curr, dict):
121 if prev == curr:
122 return None # same (non-)file -> no diff
123 else:
124 return ret # indicates that a change happened
126 if (prev is None or isinstance(prev, str)) and isinstance(curr, dict):
127 # file -> dir: everything inside "added"
128 for k, v in curr.items():
129 kpath = ret.path / k
130 d = cls.compare(None, v, kpath)
131 assert d is not None
132 ret.added[kpath] = d
133 return ret
135 if isinstance(prev, dict) and (curr is None or isinstance(curr, str)):
136 # dir -> file: everything inside "removed"
137 for k, v in prev.items():
138 kpath = ret.path / k
139 d = cls.compare(v, None, kpath)
140 assert d is not None
141 ret.removed[kpath] = d
142 return ret
144 assert isinstance(prev, dict) and isinstance(curr, dict)
145 # two directories -> compare
146 prev_keys = set(prev.keys())
147 curr_keys = set(curr.keys())
149 added = curr_keys - prev_keys
150 removed = prev_keys - curr_keys
151 intersection = (prev_keys | curr_keys) - added - removed
153 for k in added: # added in curr
154 kpath = ret.path / k
155 d = cls.compare(None, curr[k], kpath)
156 assert d is not None
157 ret.added[kpath] = d
158 for k in removed: # removed in curr
159 kpath = ret.path / k
160 d = cls.compare(prev[k], None, kpath)
161 assert d is not None
162 ret.removed[kpath] = d
163 for k in intersection: # changed in curr
164 kpath = ret.path / k
165 diff = cls.compare(prev[k], curr[k], kpath)
166 if diff is not None: # add child if there is a difference
167 ret.modified[kpath] = diff
169 # all children same -> directories same
170 same_dir = not ret.added and not ret.removed and not ret.modified
171 if same_dir:
172 return None
174 # return dir node with the changes
175 return ret
178class DirDiff:
179 """Interface to directory diffs based on comparing `DirHashsums`.
181 An instance represents a change at the path.
183 Granular change inspection is accessible through the provided methods.
185 Typically, you will want to capture `dir_hashsums` of the same `dir`
186 at two points in time and will be interested in the dict
187 `DirDiff.compare(prev, curr).annotate(dir)`.
188 """
190 _diff_root: Optional[DiffNode]
192 @property
193 def is_empty(self):
194 return self._diff_root is None
196 @classmethod
197 def compare(cls, prev: DirHashsums, curr: DirHashsums) -> DirDiff:
198 """Compute a DirDiff based on two DirHashsum trees.
200 To be meaningful, `prev` and `curr` should be trees obtained from the
201 same directory at two points in time.
202 """
203 ret = cls.__new__(cls)
204 ret._diff_root = DiffNode.compare(prev, curr, Path(""))
205 return ret
207 def get(self, path: Path) -> Optional[DiffNode]:
208 path = Path(path) # if it was a str
209 assert not path.is_absolute()
210 if self._diff_root is None:
211 return None
212 curr: DiffNode = self._diff_root
213 prefixes = [path] + list(path.parents)
214 prefixes.pop() # drop '.'
215 while prefixes:
216 path = prefixes.pop()
217 node: Optional[DiffNode] = next(
218 (x for x in curr.children() if x.path == path), None
219 )
220 if node is None:
221 return None
222 curr = node
223 return curr
225 def status(self, node: Optional[DiffNode]) -> DiffNode.Status:
226 """Return the type of change that happened to a diff node.
228 Wraps `node.status()` additionally covering the case that `node` is `None`.
229 Useful when processing possible path nodes returned by `annotate`.
230 """
231 if node is None:
232 return DiffNode.Status.unchanged
233 return node.status()
235 def annotate(self, base_dir: Path) -> Dict[Path, Optional[DiffNode]]:
236 """Return a dict of path -> status mappings based on passed directory.
238 The keys are all paths that exist in this diff object as well as
239 all paths that currently exist in the passed `base_dir`.
240 The keys will all have `base_dir` as prefix.
242 The values will be the corresponding DiffNodes, if there is a change.
243 If there was no change at that path, the value will be None.
245 The iteration order will be such that for each entity, first the removed
246 children, then changed subentities, then the entity itself, and finally
247 added subentities are listed. This is useful for updating operations
248 of directories and ensures that children can be deleted before their parents
249 and parents can be created before their children.
250 """
251 if self._diff_root is None:
252 return {}
254 nodes = self._diff_root.nodes() # nodes of all paths in the diff
256 path_nodes = {node.path: node for node in nodes}
257 # paths in base_dir, but not in the diff
258 missing = sorted(set(dir_paths(base_dir)) - set(path_nodes.keys()))
260 # construct result, keeping correct order and prepending base_dir
261 ret: Dict[Path, Optional[DiffNode]] = {
262 base_dir / str(k): v for k, v in path_nodes.items()
263 }
264 for path in missing:
265 ret[base_dir / str(path)] = None
266 return ret