Coverage for src/somesy/core/writer.py: 96%

204 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-14 13:01 +0000

1"""Project metadata writer base-class.""" 

2 

3import logging 

4from abc import ABC, abstractmethod 

5from pathlib import Path 

6from typing import Any, Dict, List, Optional, Union 

7 

8from somesy.core.models import Entity, Person, ProjectMetadata 

9 

10logger = logging.getLogger("somesy") 

11 

12 

13class IgnoreKey: 

14 """Special marker to be passed for dropping a key from serialization.""" 

15 

16 

17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]] 

18"""Type to be used for the dict passed as `direct_mappings`.""" 

19 

20DictLike = Any 

21"""Dict-like that supports getitem, setitem, delitem, etc. 

22 

23NOTE: This should be probably turned into a proper protocol. 

24""" 

25 

26 

27class ProjectMetadataWriter(ABC): 

28 """Base class for Project Metadata Output Wrapper. 

29 

30 All supported output formats are implemented as subclasses. 

31 """ 

32 

33 def __init__( 

34 self, 

35 path: Path, 

36 *, 

37 create_if_not_exists: Optional[bool] = False, 

38 direct_mappings: FieldKeyMapping = None, 

39 merge: Optional[bool] = False, 

40 pass_validation: Optional[bool] = False, 

41 ) -> None: 

42 """Initialize the Project Metadata Output Wrapper. 

43 

44 Use the `direct_mappings` dict to define 

45 format-specific location for certain fields, 

46 if no additional processing is needed that 

47 requires a customized setter. 

48 

49 Args: 

50 path: Path to target output file. 

51 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True. 

52 direct_mappings: Dict with direct mappings of keys between somesy and target 

53 merge: Merge the output file with an existing file. Defaults to False. 

54 pass_validation: Pass validation for all output files. Defaults to False. 

55 

56 """ 

57 self._data: DictLike = {} 

58 self.path = path if isinstance(path, Path) else Path(path) 

59 self.create_if_not_exists = create_if_not_exists 

60 self.direct_mappings = direct_mappings or {} 

61 self.merge = merge 

62 self.pass_validation = pass_validation 

63 if self.path.is_file(): 

64 self._load() 

65 if not self.pass_validation: 

66 self._validate() 

67 else: 

68 if self.create_if_not_exists: 

69 self._init_new_file() 

70 self._load() 

71 else: 

72 raise FileNotFoundError(f"The file {self.path} does not exist.") 

73 

74 def _init_new_file(self) -> None: 

75 """Create an new suitable target file. 

76 

77 Override to initialize file with minimal contents, if needed. 

78 Make sure to set `self._data` to match the contents. 

79 """ 

80 self.path.touch() 

81 

82 @abstractmethod 

83 def _load(self): 

84 """Load the output file and validate it. 

85 

86 Implement this method so that it loads the file `self.path` 

87 into the `self._data` dict. 

88 

89 The file is guaranteed to exist. 

90 """ 

91 

92 @abstractmethod 

93 def _validate(self) -> None: 

94 """Validate the target file data. 

95 

96 Implement this method so that it checks 

97 the validity of the metadata (relevant to somesy) 

98 in that file and raises exceptions on failure. 

99 """ 

100 

101 @abstractmethod 

102 def save(self, path: Optional[Path]) -> None: 

103 """Save the output file to the given path. 

104 

105 Implement this in a way that will carefully 

106 update the target file with new metadata 

107 without destroying its other contents or structure. 

108 """ 

109 

110 def _get_property( 

111 self, 

112 key: Union[str, List[str]], 

113 *, 

114 only_first: bool = False, 

115 remove: bool = False, 

116 ) -> Optional[Any]: 

117 """Get a property from the data. 

118 

119 Override this to e.g. rewrite the retrieved key 

120 (e.g. if everything relevant is in some subobject). 

121 

122 Args: 

123 key: Name of the key or sequence of multiple keys to retrieve the value. 

124 only_first: If True, returns only first entry if the value is a list. 

125 remove: If True, will remove the retrieved value and clean up the dict. 

126 

127 """ 

128 key_path = [key] if isinstance(key, str) else key 

129 

130 curr = self._data 

131 seq = [curr] 

132 for k in key_path: 

133 curr = curr.get(k) 

134 curr = curr[0] if isinstance(curr, list) and only_first else curr 

135 seq.append(curr) 

136 if curr is None: 

137 return None 

138 

139 if remove: 

140 seq.pop() 

141 del seq[-1][key_path[-1]] # remove leaf value 

142 # clean up the tree 

143 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))): 

144 if not dct.get(key): 

145 del dct[key] 

146 

147 if isinstance(curr, list) and only_first: 

148 return curr[0] 

149 return curr 

150 

151 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None: 

152 """Set a property in the data. 

153 

154 Note if there are lists along the path, they are cleared out. 

155 

156 Override this to e.g. rewrite the retrieved key 

157 (e.g. if everything relevant is in some subobject). 

158 """ 

159 if isinstance(key, IgnoreKey): 

160 return 

161 key_path = [key] if isinstance(key, str) else key 

162 

163 if not value: # remove value and clean up the sub-dict 

164 self._get_property(key_path, remove=True) 

165 return 

166 

167 # create path on the fly if needed 

168 curr = self._data 

169 for key in key_path[:-1]: 

170 if key not in curr: 

171 curr[key] = {} 

172 curr = curr[key] 

173 

174 curr[key_path[-1]] = value 

175 

176 # ---- 

177 # special handling for person metadata 

178 

179 def _merge_person_metadata( 

180 self, old: List[Union[Person, Entity]], new: List[Union[Person, Entity]] 

181 ) -> List[Union[Person, Entity]]: 

182 """Update metadata of a list of persons. 

183 

184 Will identify people based on orcid, email or full name. 

185 

186 If old list has same person listed multiple times, 

187 the resulting list will too (we cannot correctly merge for external formats.) 

188 """ 

189 new_people = [] # list for new people (e.g. added authors) 

190 # flag, meaning "person was not removed" 

191 still_exists = [False for i in range(len(old))] 

192 # copies of old person data, to be modified 

193 modified_people = [p.model_copy() for p in old] 

194 

195 # try to match new people to existing old ones 

196 # (inefficient, but author list are not that long usually) 

197 for person_meta in new: 

198 person_update = person_meta.model_dump() 

199 person_existed = False 

200 for i in range(len(modified_people)): 

201 person = modified_people[i] 

202 if not person.same_person(person_meta): 

203 continue 

204 

205 # not new person (-> will not append new record) 

206 person_existed = True 

207 # still exists (-> will not be removed from list) 

208 still_exists[i] = True 

209 

210 # if there were changes -> update person 

211 overlapping_fields = person.model_dump( 

212 include=set(person_update.keys()) 

213 ) 

214 if person_update != overlapping_fields: 

215 modified_people[i] = person.model_copy(update=person_update) 

216 

217 # show effective update in debug log 

218 old_fmt = self._from_person(person) 

219 new_fmt = self._from_person(modified_people[i]) 

220 if old_fmt != new_fmt: 

221 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}") 

222 

223 if not person_existed: 

224 new_people.append(person_meta) 

225 

226 # show added and removed people in debug log 

227 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]] 

228 for person in removed_people: 

229 logger.debug(f"Removing person\n{self._from_person(person)}") 

230 for person in new_people: 

231 logger.debug(f"Adding person\n{self._from_person(person)}") 

232 

233 # return updated list of (still existing) people, 

234 # and all new people coming after them. 

235 existing_modified = [ 

236 modified_people[i] for i in range(len(old)) if still_exists[i] 

237 ] 

238 return existing_modified + new_people 

239 

240 def _sync_person_list( 

241 self, old: List[Any], new: List[Union[Person, Entity]] 

242 ) -> List[Any]: 

243 """Sync a list of persons with new metadata. 

244 

245 Args: 

246 old (List[Any]): list of persons in format-specific representation 

247 new (List[Person]): list of persons in somesy representation 

248 

249 Returns: 

250 List[Any]: updated list of persons in format-specific representation 

251 

252 """ 

253 old_people: List[Union[Person, Entity]] = self._parse_people(old) 

254 

255 # check if people are unique 

256 def filter_unique( 

257 people: List[Union[Person, Entity]], 

258 ) -> List[Union[Person, Entity]]: 

259 """Filter out duplicate people from a list.""" 

260 if people is None or len(people) == 0: 

261 return [] 

262 

263 unique_people: List[Union[Person, Entity]] = [] 

264 # use same_person method to check if people are unique 

265 for person in people: 

266 if not any(person.same_person(p) for p in unique_people): 

267 unique_people.append(person) 

268 

269 return unique_people 

270 

271 old_people_unique = filter_unique(old_people) 

272 new_people_unique = filter_unique(new) 

273 

274 return self._merge_person_metadata(old_people_unique, new_people_unique) 

275 

276 def _sync_authors(self, metadata: ProjectMetadata) -> None: 

277 """Sync output file authors with authors from metadata. 

278 

279 This method is existing for the publication_author special case 

280 when synchronizing to CITATION.cff. 

281 """ 

282 if self.authors is None or len(self.authors) == 0: 

283 self.authors = metadata.authors() 

284 else: 

285 self.authors = self._sync_person_list(self.authors, metadata.authors()) 

286 

287 def sync(self, metadata: ProjectMetadata) -> None: 

288 """Sync output file with other metadata files.""" 

289 self.name = metadata.name 

290 self.description = metadata.description 

291 

292 if metadata.version: 

293 self.version = metadata.version 

294 

295 if metadata.keywords: 

296 self.keywords = metadata.keywords 

297 

298 self._sync_authors(metadata) 

299 self.maintainers = self._sync_person_list( 

300 self.maintainers, metadata.maintainers() 

301 ) 

302 

303 self.license = metadata.license.value 

304 

305 self.homepage = str(metadata.homepage) if metadata.homepage else None 

306 self.repository = str(metadata.repository) if metadata.repository else None 

307 self.documentation = ( 

308 str(metadata.documentation) if metadata.documentation else None 

309 ) 

310 

311 @staticmethod 

312 @abstractmethod 

313 def _from_person(person: Union[Person, Entity]) -> Any: 

314 """Convert a `Person` or `Entity` object into suitable target format.""" 

315 

316 @staticmethod 

317 @abstractmethod 

318 def _to_person(person_obj: Any) -> Union[Person, Entity]: 

319 """Convert an object representing a person into a `Person` or `Entity` object.""" 

320 

321 @classmethod 

322 def _parse_people(cls, people: Optional[List[Any]]) -> List[Union[Person, Entity]]: 

323 """Return a list of Persons and Entities parsed from list of format-specific people representations.""" 

324 # remove None values 

325 people = [p for p in people if p is not None] 

326 

327 people = list(map(lambda p: cls._to_person(p), people or [])) 

328 return people 

329 

330 # ---- 

331 # individual magic getters and setters 

332 

333 def _get_key(self, key): 

334 """Get a key itself.""" 

335 return self.direct_mappings.get(key) or key 

336 

337 @property 

338 def name(self): 

339 """Return the name of the project.""" 

340 return self._get_property(self._get_key("name")) 

341 

342 @name.setter 

343 def name(self, name: str) -> None: 

344 """Set the name of the project.""" 

345 self._set_property(self._get_key("name"), name) 

346 

347 @property 

348 def version(self) -> Optional[str]: 

349 """Return the version of the project.""" 

350 return self._get_property(self._get_key("version")) 

351 

352 @version.setter 

353 def version(self, version: str) -> None: 

354 """Set the version of the project.""" 

355 self._set_property(self._get_key("version"), version) 

356 

357 @property 

358 def description(self) -> Optional[str]: 

359 """Return the description of the project.""" 

360 return self._get_property(self._get_key("description")) 

361 

362 @description.setter 

363 def description(self, description: str) -> None: 

364 """Set the description of the project.""" 

365 self._set_property(self._get_key("description"), description) 

366 

367 @property 

368 def authors(self): 

369 """Return the authors of the project.""" 

370 authors = self._get_property(self._get_key("authors")) 

371 if authors is None or len(authors) == 0: 

372 return [] 

373 

374 # only return authors that can be converted to Person 

375 authors_validated = [ 

376 author for author in authors if self._to_person(author) is not None 

377 ] 

378 return authors_validated 

379 

380 @authors.setter 

381 def authors(self, authors: List[Union[Person, Entity]]) -> None: 

382 """Set the authors of the project.""" 

383 authors = [self._from_person(c) for c in authors] 

384 self._set_property(self._get_key("authors"), authors) 

385 

386 @property 

387 def maintainers(self): 

388 """Return the maintainers of the project.""" 

389 maintainers = self._get_property(self._get_key("maintainers")) 

390 if maintainers is None: 

391 return [] 

392 

393 # only return maintainers that can be converted to Person 

394 maintainers_validated = [ 

395 maintainer 

396 for maintainer in maintainers 

397 if self._to_person(maintainer) is not None 

398 ] 

399 return maintainers_validated 

400 

401 @maintainers.setter 

402 def maintainers(self, maintainers: List[Union[Person, Entity]]) -> None: 

403 """Set the maintainers of the project.""" 

404 maintainers = [self._from_person(c) for c in maintainers] 

405 self._set_property(self._get_key("maintainers"), maintainers) 

406 

407 @property 

408 def contributors(self): 

409 """Return the contributors of the project.""" 

410 return self._get_property(self._get_key("contributors")) 

411 

412 @contributors.setter 

413 def contributors(self, contributors: List[Union[Person, Entity]]) -> None: 

414 """Set the contributors of the project.""" 

415 contributors = [self._from_person(c) for c in contributors] 

416 self._set_property(self._get_key("contributors"), contributors) 

417 

418 @property 

419 def keywords(self) -> Optional[List[str]]: 

420 """Return the keywords of the project.""" 

421 return self._get_property(self._get_key("keywords")) 

422 

423 @keywords.setter 

424 def keywords(self, keywords: List[str]) -> None: 

425 """Set the keywords of the project.""" 

426 self._set_property(self._get_key("keywords"), keywords) 

427 

428 @property 

429 def license(self) -> Optional[str]: 

430 """Return the license of the project.""" 

431 return self._get_property(self._get_key("license")) 

432 

433 @license.setter 

434 def license(self, license: Optional[str]) -> None: 

435 """Set the license of the project.""" 

436 self._set_property(self._get_key("license"), license) 

437 

438 @property 

439 def homepage(self) -> Optional[str]: 

440 """Return the homepage url of the project.""" 

441 return self._get_property(self._get_key("homepage")) 

442 

443 @homepage.setter 

444 def homepage(self, value: Optional[str]) -> None: 

445 """Set the homepage url of the project.""" 

446 self._set_property(self._get_key("homepage"), value) 

447 

448 @property 

449 def repository(self) -> Optional[Union[str, dict]]: 

450 """Return the repository url of the project.""" 

451 return self._get_property(self._get_key("repository")) 

452 

453 @repository.setter 

454 def repository(self, value: Optional[Union[str, dict]]) -> None: 

455 """Set the repository url of the project.""" 

456 self._set_property(self._get_key("repository"), value) 

457 

458 @property 

459 def documentation(self) -> Optional[Union[str, dict]]: 

460 """Return the documentation url of the project.""" 

461 return self._get_property(self._get_key("documentation")) 

462 

463 @documentation.setter 

464 def documentation(self, value: Optional[Union[str, dict]]) -> None: 

465 """Set the documentation url of the project.""" 

466 self._set_property(self._get_key("documentation"), value)