Coverage for src/metador_core/util/diff.py: 33%

147 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 09:33 +0000

1from __future__ import annotations 

2 

3import itertools 

4from enum import Enum 

5from pathlib import Path 

6from typing import Any, Dict, Iterable, List, Optional, Union 

7 

8from pydantic import BaseModel 

9 

10from .hashsums import DirHashsums 

11 

12 

13def dir_paths(base_dir: Path): 

14 """Recursively list all paths in given directory, relative to itself.""" 

15 return map(lambda p: p.relative_to(base_dir), sorted(base_dir.rglob("*"))) 

16 

17 

18class DiffNode(BaseModel): 

19 """Node representing a file, symlink or directory in a diff.""" 

20 

21 class Status(str, Enum): 

22 """Change that happened to a DiffNode.""" 

23 

24 removed = "-" 

25 modified = "~" 

26 added = "+" 

27 unchanged = "0" 

28 

29 class ObjType(str, Enum): 

30 """Entities represented in a DiffNode.""" 

31 

32 directory = "d" 

33 file = "f" 

34 symlink = "s" 

35 

36 path: Path 

37 """Location represented by this node.""" 

38 

39 prev: Union[None, str, Dict[str, Any]] 

40 """Previous entity at this location.""" 

41 

42 curr: Union[None, str, Dict[str, Any]] 

43 """Current entity at this location.""" 

44 

45 removed: Dict[Path, DiffNode] = {} 

46 """Deleted files and subdirectories.""" 

47 

48 modified: Dict[Path, DiffNode] = {} 

49 """Modified or replaced files and subdirectories.""" 

50 

51 added: Dict[Path, DiffNode] = {} 

52 """New files and subdirectories.""" 

53 

54 def _type(self, entity) -> Optional[DiffNode.ObjType]: 

55 if isinstance(entity, dict): 

56 return DiffNode.ObjType.directory 

57 elif entity: 

58 if entity.find("symlink:") == 0: 

59 return DiffNode.ObjType.symlink 

60 else: 

61 return DiffNode.ObjType.file 

62 return None 

63 

64 @property 

65 def prev_type(self) -> Optional[DiffNode.ObjType]: 

66 return self._type(self.prev) 

67 

68 @property 

69 def curr_type(self) -> Optional[DiffNode.ObjType]: 

70 return self._type(self.curr) 

71 

72 def children(self) -> Iterable[DiffNode]: 

73 """Return immediate children in no specific order.""" 

74 return itertools.chain( 

75 *(x.values() for x in [self.removed, self.modified, self.added]) 

76 ) 

77 

78 def nodes(self) -> List[DiffNode]: 

79 """Return list of all nodes in this diff. 

80 

81 The order will be recursively: 

82 removed children, modified children, itself, then added children. 

83 Within the same category the children are sorted in alphabetical order. 

84 """ 

85 ret = [] 

86 buckets = [self.removed, self.modified, None, self.added] 

87 for b in buckets: 

88 if b is None: 

89 ret.append(self) 

90 else: 

91 for v in sorted(b.values(), key=lambda x: x.path): 

92 ret += v.nodes() 

93 return ret 

94 

95 def status(self) -> DiffNode.Status: 

96 """Check the given path (which is assumed to be relative to this diff node). 

97 

98 Returns whether the path was added, removed or modified, respectively. 

99 """ 

100 if self.prev is None: 

101 return DiffNode.Status.added 

102 elif self.curr is None: 

103 return DiffNode.Status.removed 

104 else: # we can assume self.prev != self.curr 

105 return DiffNode.Status.modified 

106 

107 @classmethod 

108 def compare( 

109 cls, 

110 prev: Optional[DirHashsums], 

111 curr: Optional[DirHashsums], 

112 path: Path, 

113 ) -> Optional[DiffNode]: 

114 """Compare two nested file and directory hashsum dicts. 

115 

116 Returns None if no difference is found, otherwise a DiffNode tree 

117 containing only the additions, removals and changes. 

118 """ 

119 ret = cls(path=path, prev=prev, curr=curr) 

120 if not isinstance(prev, dict) and not isinstance(curr, dict): 

121 if prev == curr: 

122 return None # same (non-)file -> no diff 

123 else: 

124 return ret # indicates that a change happened 

125 

126 if (prev is None or isinstance(prev, str)) and isinstance(curr, dict): 

127 # file -> dir: everything inside "added" 

128 for k, v in curr.items(): 

129 kpath = ret.path / k 

130 d = cls.compare(None, v, kpath) 

131 assert d is not None 

132 ret.added[kpath] = d 

133 return ret 

134 

135 if isinstance(prev, dict) and (curr is None or isinstance(curr, str)): 

136 # dir -> file: everything inside "removed" 

137 for k, v in prev.items(): 

138 kpath = ret.path / k 

139 d = cls.compare(v, None, kpath) 

140 assert d is not None 

141 ret.removed[kpath] = d 

142 return ret 

143 

144 assert isinstance(prev, dict) and isinstance(curr, dict) 

145 # two directories -> compare 

146 prev_keys = set(prev.keys()) 

147 curr_keys = set(curr.keys()) 

148 

149 added = curr_keys - prev_keys 

150 removed = prev_keys - curr_keys 

151 intersection = (prev_keys | curr_keys) - added - removed 

152 

153 for k in added: # added in curr 

154 kpath = ret.path / k 

155 d = cls.compare(None, curr[k], kpath) 

156 assert d is not None 

157 ret.added[kpath] = d 

158 for k in removed: # removed in curr 

159 kpath = ret.path / k 

160 d = cls.compare(prev[k], None, kpath) 

161 assert d is not None 

162 ret.removed[kpath] = d 

163 for k in intersection: # changed in curr 

164 kpath = ret.path / k 

165 diff = cls.compare(prev[k], curr[k], kpath) 

166 if diff is not None: # add child if there is a difference 

167 ret.modified[kpath] = diff 

168 

169 # all children same -> directories same 

170 same_dir = not ret.added and not ret.removed and not ret.modified 

171 if same_dir: 

172 return None 

173 

174 # return dir node with the changes 

175 return ret 

176 

177 

178class DirDiff: 

179 """Interface to directory diffs based on comparing `DirHashsums`. 

180 

181 An instance represents a change at the path. 

182 

183 Granular change inspection is accessible through the provided methods. 

184 

185 Typically, you will want to capture `dir_hashsums` of the same `dir` 

186 at two points in time and will be interested in the dict 

187 `DirDiff.compare(prev, curr).annotate(dir)`. 

188 """ 

189 

190 _diff_root: Optional[DiffNode] 

191 

192 @property 

193 def is_empty(self): 

194 return self._diff_root is None 

195 

196 @classmethod 

197 def compare(cls, prev: DirHashsums, curr: DirHashsums) -> DirDiff: 

198 """Compute a DirDiff based on two DirHashsum trees. 

199 

200 To be meaningful, `prev` and `curr` should be trees obtained from the 

201 same directory at two points in time. 

202 """ 

203 ret = cls.__new__(cls) 

204 ret._diff_root = DiffNode.compare(prev, curr, Path("")) 

205 return ret 

206 

207 def get(self, path: Path) -> Optional[DiffNode]: 

208 path = Path(path) # if it was a str 

209 assert not path.is_absolute() 

210 if self._diff_root is None: 

211 return None 

212 curr: DiffNode = self._diff_root 

213 prefixes = [path] + list(path.parents) 

214 prefixes.pop() # drop '.' 

215 while prefixes: 

216 path = prefixes.pop() 

217 node: Optional[DiffNode] = next( 

218 (x for x in curr.children() if x.path == path), None 

219 ) 

220 if node is None: 

221 return None 

222 curr = node 

223 return curr 

224 

225 def status(self, node: Optional[DiffNode]) -> DiffNode.Status: 

226 """Return the type of change that happened to a diff node. 

227 

228 Wraps `node.status()` additionally covering the case that `node` is `None`. 

229 Useful when processing possible path nodes returned by `annotate`. 

230 """ 

231 if node is None: 

232 return DiffNode.Status.unchanged 

233 return node.status() 

234 

235 def annotate(self, base_dir: Path) -> Dict[Path, Optional[DiffNode]]: 

236 """Return a dict of path -> status mappings based on passed directory. 

237 

238 The keys are all paths that exist in this diff object as well as 

239 all paths that currently exist in the passed `base_dir`. 

240 The keys will all have `base_dir` as prefix. 

241 

242 The values will be the corresponding DiffNodes, if there is a change. 

243 If there was no change at that path, the value will be None. 

244 

245 The iteration order will be such that for each entity, first the removed 

246 children, then changed subentities, then the entity itself, and finally 

247 added subentities are listed. This is useful for updating operations 

248 of directories and ensures that children can be deleted before their parents 

249 and parents can be created before their children. 

250 """ 

251 if self._diff_root is None: 

252 return {} 

253 

254 nodes = self._diff_root.nodes() # nodes of all paths in the diff 

255 

256 path_nodes = {node.path: node for node in nodes} 

257 # paths in base_dir, but not in the diff 

258 missing = sorted(set(dir_paths(base_dir)) - set(path_nodes.keys())) 

259 

260 # construct result, keeping correct order and prepending base_dir 

261 ret: Dict[Path, Optional[DiffNode]] = { 

262 base_dir / str(k): v for k, v in path_nodes.items() 

263 } 

264 for path in missing: 

265 ret[base_dir / str(path)] = None 

266 return ret