Coverage for src/somesy/core/writer.py: 95%

181 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-04-30 09:42 +0000

1"""Project metadata writer base-class.""" 

2import logging 

3from abc import ABC, abstractmethod 

4from pathlib import Path 

5from typing import Any, Dict, List, Optional, Union 

6 

7from somesy.core.models import Person, ProjectMetadata 

8 

9logger = logging.getLogger("somesy") 

10 

11 

12class IgnoreKey: 

13 """Special marker to be passed for dropping a key from serialization.""" 

14 

15 

16FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]] 

17"""Type to be used for the dict passed as `direct_mappings`.""" 

18 

19DictLike = Any 

20"""Dict-like that supports getitem, setitem, delitem, etc. 

21 

22NOTE: This should be probably turned into a proper protocol. 

23""" 

24 

25 

26class ProjectMetadataWriter(ABC): 

27 """Base class for Project Metadata Output Wrapper. 

28 

29 All supported output formats are implemented as subclasses. 

30 """ 

31 

32 def __init__( 

33 self, 

34 path: Path, 

35 *, 

36 create_if_not_exists: Optional[bool] = False, 

37 direct_mappings: FieldKeyMapping = None, 

38 ) -> None: 

39 """Initialize the Project Metadata Output Wrapper. 

40 

41 Use the `direct_mappings` dict to define 

42 format-specific location for certain fields, 

43 if no additional processing is needed that 

44 requires a customized setter. 

45 

46 Args: 

47 path: Path to target output file. 

48 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True. 

49 direct_mappings: Dict with direct mappings of keys between somesy and target 

50 """ 

51 self._data: DictLike = {} 

52 self.path = path if isinstance(path, Path) else Path(path) 

53 self.create_if_not_exists = create_if_not_exists 

54 self.direct_mappings = direct_mappings or {} 

55 

56 if self.path.is_file(): 

57 self._load() 

58 self._validate() 

59 else: 

60 if self.create_if_not_exists: 

61 self._init_new_file() 

62 self._load() 

63 else: 

64 raise FileNotFoundError(f"The file {self.path} does not exist.") 

65 

66 def _init_new_file(self) -> None: 

67 """Create an new suitable target file. 

68 

69 Override to initialize file with minimal contents, if needed. 

70 Make sure to set `self._data` to match the contents. 

71 """ 

72 self.path.touch() 

73 

74 @abstractmethod 

75 def _load(self): 

76 """Load the output file and validate it. 

77 

78 Implement this method so that it loads the file `self.path` 

79 into the `self._data` dict. 

80 

81 The file is guaranteed to exist. 

82 """ 

83 

84 @abstractmethod 

85 def _validate(self): 

86 """Validate the target file data. 

87 

88 Implement this method so that it checks 

89 the validity of the metadata (relevant to somesy) 

90 in that file and raises exceptions on failure. 

91 """ 

92 

93 @abstractmethod 

94 def save(self, path: Optional[Path]) -> None: 

95 """Save the output file to the given path. 

96 

97 Implement this in a way that will carefully 

98 update the target file with new metadata 

99 without destroying its other contents or structure. 

100 """ 

101 

102 def _get_property( 

103 self, 

104 key: Union[str, List[str]], 

105 *, 

106 only_first: bool = False, 

107 remove: bool = False, 

108 ) -> Optional[Any]: 

109 """Get a property from the data. 

110 

111 Override this to e.g. rewrite the retrieved key 

112 (e.g. if everything relevant is in some subobject). 

113 

114 Args: 

115 key: Name of the key or sequence of multiple keys to retrieve the value. 

116 only_first: If True, returns only first entry if the value is a list. 

117 remove: If True, will remove the retrieved value and clean up the dict. 

118 """ 

119 key_path = [key] if isinstance(key, str) else key 

120 

121 curr = self._data 

122 seq = [curr] 

123 for k in key_path: 

124 curr = curr.get(k) 

125 curr = curr[0] if isinstance(curr, list) and only_first else curr 

126 seq.append(curr) 

127 if curr is None: 

128 return None 

129 

130 if remove: 

131 seq.pop() 

132 del seq[-1][key_path[-1]] # remove leaf value 

133 # clean up the tree 

134 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))): 

135 if not dct.get(key): 

136 del dct[key] 

137 

138 if isinstance(curr, list) and only_first: 

139 return curr[0] 

140 return curr 

141 

142 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None: 

143 """Set a property in the data. 

144 

145 Note if there are lists along the path, they are cleared out. 

146 

147 Override this to e.g. rewrite the retrieved key 

148 (e.g. if everything relevant is in some subobject). 

149 """ 

150 if isinstance(key, IgnoreKey): 

151 return 

152 key_path = [key] if isinstance(key, str) else key 

153 

154 if not value: # remove value and clean up the sub-dict 

155 self._get_property(key_path, remove=True) 

156 return 

157 

158 # create path on the fly if needed 

159 curr = self._data 

160 for key in key_path[:-1]: 

161 if key not in curr: 

162 curr[key] = {} 

163 curr = curr[key] 

164 

165 curr[key_path[-1]] = value 

166 

167 # ---- 

168 # special handling for person metadata 

169 

170 def _merge_person_metadata( 

171 self, old: List[Person], new: List[Person] 

172 ) -> List[Person]: 

173 """Update metadata of a list of persons. 

174 

175 Will identify people based on orcid, email or full name. 

176 

177 If old list has same person listed multiple times, 

178 the resulting list will too (we cannot correctly merge for external formats.) 

179 """ 

180 new_people = [] # list for new people (e.g. added authors) 

181 # flag, meaning "person was not removed" 

182 still_exists = [False for i in range(len(old))] 

183 # copies of old person data, to be modified 

184 modified_people = [p.model_copy() for p in old] 

185 

186 # try to match new people to existing old ones 

187 # (inefficient, but author list are not that long usually) 

188 for person_meta in new: 

189 person_update = person_meta.model_dump() 

190 person_existed = False 

191 for i in range(len(modified_people)): 

192 person = modified_people[i] 

193 if not person.same_person(person_meta): 

194 continue 

195 

196 # not new person (-> will not append new record) 

197 person_existed = True 

198 # still exists (-> will not be removed from list) 

199 still_exists[i] = True 

200 

201 # if there were changes -> update person 

202 overlapping_fields = person.model_dump( 

203 include=set(person_update.keys()) 

204 ) 

205 if person_update != overlapping_fields: 

206 modified_people[i] = person.model_copy(update=person_update) 

207 

208 # show effective update in debug log 

209 old_fmt = self._from_person(person) 

210 new_fmt = self._from_person(modified_people[i]) 

211 if old_fmt != new_fmt: 

212 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}") 

213 

214 if not person_existed: 

215 new_people.append(person_meta) 

216 

217 # show added and removed people in debug log 

218 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]] 

219 for person in removed_people: 

220 logger.debug(f"Removing person\n{self._from_person(person)}") 

221 for person in new_people: 

222 logger.debug(f"Adding person\n{self._from_person(person)}") 

223 

224 # return updated list of (still existing) people, 

225 # and all new people coming after them. 

226 existing_modified = [ 

227 modified_people[i] for i in range(len(old)) if still_exists[i] 

228 ] 

229 return existing_modified + new_people 

230 

231 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]: 

232 """Sync a list of persons with new metadata. 

233 

234 Args: 

235 old (List[Any]): list of persons in format-specific representation 

236 new (List[Person]): list of persons in somesy representation 

237 

238 Returns: 

239 List[Any]: updated list of persons in format-specific representation 

240 """ 

241 old_people: List[Person] = self._parse_people(old) 

242 return self._merge_person_metadata(old_people, new) 

243 

244 def _sync_authors(self, metadata: ProjectMetadata) -> None: 

245 """Sync output file authors with authors from metadata. 

246 

247 This method is existing for the publication_author special case 

248 when synchronizing to CITATION.cff. 

249 """ 

250 self.authors = self._sync_person_list(self.authors, metadata.authors()) 

251 

252 def sync(self, metadata: ProjectMetadata) -> None: 

253 """Sync output file with other metadata files.""" 

254 self.name = metadata.name 

255 self.description = metadata.description 

256 

257 if metadata.version: 

258 self.version = metadata.version 

259 

260 if metadata.keywords: 

261 self.keywords = metadata.keywords 

262 

263 self._sync_authors(metadata) 

264 self.maintainers = self._sync_person_list( 

265 self.maintainers, metadata.maintainers() 

266 ) 

267 

268 self.license = metadata.license.value 

269 

270 self.homepage = str(metadata.homepage) if metadata.homepage else None 

271 self.repository = str(metadata.repository) if metadata.repository else None 

272 self.documentation = ( 

273 str(metadata.documentation) if metadata.documentation else None 

274 ) 

275 

276 @staticmethod 

277 @abstractmethod 

278 def _from_person(person: Person) -> Any: 

279 """Convert a `Person` object into suitable target format.""" 

280 

281 @staticmethod 

282 @abstractmethod 

283 def _to_person(person_obj: Any) -> Person: 

284 """Convert an object representing a person into a `Person` object.""" 

285 

286 @classmethod 

287 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]: 

288 """Return a list of Persons parsed from list of format-specific people representations.""" 

289 return list(map(cls._to_person, people or [])) 

290 

291 # ---- 

292 # individual magic getters and setters 

293 

294 def _get_key(self, key): 

295 return self.direct_mappings.get(key) or key 

296 

297 @property 

298 def name(self): 

299 """Return the name of the project.""" 

300 return self._get_property(self._get_key("name")) 

301 

302 @name.setter 

303 def name(self, name: str) -> None: 

304 """Set the name of the project.""" 

305 self._set_property(self._get_key("name"), name) 

306 

307 @property 

308 def version(self) -> Optional[str]: 

309 """Return the version of the project.""" 

310 return self._get_property(self._get_key("version")) 

311 

312 @version.setter 

313 def version(self, version: str) -> None: 

314 """Set the version of the project.""" 

315 self._set_property(self._get_key("version"), version) 

316 

317 @property 

318 def description(self) -> Optional[str]: 

319 """Return the description of the project.""" 

320 return self._get_property(self._get_key("description")) 

321 

322 @description.setter 

323 def description(self, description: str) -> None: 

324 """Set the description of the project.""" 

325 self._set_property(self._get_key("description"), description) 

326 

327 @property 

328 def authors(self): 

329 """Return the authors of the project.""" 

330 return self._get_property(self._get_key("authors")) 

331 

332 @authors.setter 

333 def authors(self, authors: List[Person]) -> None: 

334 """Set the authors of the project.""" 

335 authors = [self._from_person(c) for c in authors] 

336 self._set_property(self._get_key("authors"), authors) 

337 

338 @property 

339 def maintainers(self): 

340 """Return the maintainers of the project.""" 

341 return self._get_property(self._get_key("maintainers")) 

342 

343 @maintainers.setter 

344 def maintainers(self, maintainers: List[Person]) -> None: 

345 """Set the maintainers of the project.""" 

346 maintainers = [self._from_person(c) for c in maintainers] 

347 self._set_property(self._get_key("maintainers"), maintainers) 

348 

349 @property 

350 def contributors(self): 

351 """Return the contributors of the project.""" 

352 return self._get_property(self._get_key("contributors")) 

353 

354 @contributors.setter 

355 def contributors(self, contributors: List[Person]) -> None: 

356 """Set the contributors of the project.""" 

357 contributors = [self._from_person(c) for c in contributors] 

358 self._set_property(self._get_key("contributors"), contributors) 

359 

360 @property 

361 def keywords(self) -> Optional[List[str]]: 

362 """Return the keywords of the project.""" 

363 return self._get_property(self._get_key("keywords")) 

364 

365 @keywords.setter 

366 def keywords(self, keywords: List[str]) -> None: 

367 """Set the keywords of the project.""" 

368 self._set_property(self._get_key("keywords"), keywords) 

369 

370 @property 

371 def license(self) -> Optional[str]: 

372 """Return the license of the project.""" 

373 return self._get_property(self._get_key("license")) 

374 

375 @license.setter 

376 def license(self, license: Optional[str]) -> None: 

377 """Set the license of the project.""" 

378 self._set_property(self._get_key("license"), license) 

379 

380 @property 

381 def homepage(self) -> Optional[str]: 

382 """Return the homepage url of the project.""" 

383 return self._get_property(self._get_key("homepage")) 

384 

385 @homepage.setter 

386 def homepage(self, value: Optional[str]) -> None: 

387 """Set the homepage url of the project.""" 

388 self._set_property(self._get_key("homepage"), value) 

389 

390 @property 

391 def repository(self) -> Optional[Union[str, dict]]: 

392 """Return the repository url of the project.""" 

393 return self._get_property(self._get_key("repository")) 

394 

395 @repository.setter 

396 def repository(self, value: Optional[Union[str, dict]]) -> None: 

397 """Set the repository url of the project.""" 

398 self._set_property(self._get_key("repository"), value) 

399 

400 @property 

401 def documentation(self) -> Optional[Union[str, dict]]: 

402 """Return the documentation url of the project.""" 

403 return self._get_property(self._get_key("documentation")) 

404 

405 @documentation.setter 

406 def documentation(self, value: Optional[Union[str, dict]]) -> None: 

407 """Set the documentation url of the project.""" 

408 self._set_property(self._get_key("documentation"), value)