Coverage for src/somesy/core/writer.py: 95%

198 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-10 14:56 +0000

1"""Project metadata writer base-class.""" 

2 

3import logging 

4from abc import ABC, abstractmethod 

5from pathlib import Path 

6from typing import Any, Dict, List, Optional, Union 

7 

8from somesy.core.models import Entity, Person, ProjectMetadata 

9 

10logger = logging.getLogger("somesy") 

11 

12 

13class IgnoreKey: 

14 """Special marker to be passed for dropping a key from serialization.""" 

15 

16 

17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]] 

18"""Type to be used for the dict passed as `direct_mappings`.""" 

19 

20DictLike = Any 

21"""Dict-like that supports getitem, setitem, delitem, etc. 

22 

23NOTE: This should be probably turned into a proper protocol. 

24""" 

25 

26 

27class ProjectMetadataWriter(ABC): 

28 """Base class for Project Metadata Output Wrapper. 

29 

30 All supported output formats are implemented as subclasses. 

31 """ 

32 

33 def __init__( 

34 self, 

35 path: Path, 

36 *, 

37 create_if_not_exists: Optional[bool] = False, 

38 direct_mappings: FieldKeyMapping = None, 

39 merge: Optional[bool] = False, 

40 pass_validation: Optional[bool] = False, 

41 ) -> None: 

42 """Initialize the Project Metadata Output Wrapper. 

43 

44 Use the `direct_mappings` dict to define 

45 format-specific location for certain fields, 

46 if no additional processing is needed that 

47 requires a customized setter. 

48 

49 Args: 

50 path: Path to target output file. 

51 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True. 

52 direct_mappings: Dict with direct mappings of keys between somesy and target 

53 merge: Merge the output file with an existing file. Defaults to False. 

54 pass_validation: Pass validation for all output files. Defaults to False. 

55 

56 """ 

57 self._data: DictLike = {} 

58 self.path = path if isinstance(path, Path) else Path(path) 

59 self.create_if_not_exists = create_if_not_exists 

60 self.direct_mappings = direct_mappings or {} 

61 self.merge = merge 

62 self.pass_validation = pass_validation 

63 if self.path.is_file(): 

64 self._load() 

65 if not self.pass_validation: 

66 self._validate() 

67 else: 

68 if self.create_if_not_exists: 

69 self._init_new_file() 

70 self._load() 

71 else: 

72 raise FileNotFoundError(f"The file {self.path} does not exist.") 

73 

74 def _init_new_file(self) -> None: 

75 """Create an new suitable target file. 

76 

77 Override to initialize file with minimal contents, if needed. 

78 Make sure to set `self._data` to match the contents. 

79 """ 

80 self.path.touch() 

81 

82 @abstractmethod 

83 def _load(self): 

84 """Load the output file and validate it. 

85 

86 Implement this method so that it loads the file `self.path` 

87 into the `self._data` dict. 

88 

89 The file is guaranteed to exist. 

90 """ 

91 

92 @abstractmethod 

93 def _validate(self) -> None: 

94 """Validate the target file data. 

95 

96 Implement this method so that it checks 

97 the validity of the metadata (relevant to somesy) 

98 in that file and raises exceptions on failure. 

99 """ 

100 

101 @abstractmethod 

102 def save(self, path: Optional[Path]) -> None: 

103 """Save the output file to the given path. 

104 

105 Implement this in a way that will carefully 

106 update the target file with new metadata 

107 without destroying its other contents or structure. 

108 """ 

109 

110 def _get_property( 

111 self, 

112 key: Union[str, List[str]], 

113 *, 

114 only_first: bool = False, 

115 remove: bool = False, 

116 ) -> Optional[Any]: 

117 """Get a property from the data. 

118 

119 Override this to e.g. rewrite the retrieved key 

120 (e.g. if everything relevant is in some subobject). 

121 

122 Args: 

123 key: Name of the key or sequence of multiple keys to retrieve the value. 

124 only_first: If True, returns only first entry if the value is a list. 

125 remove: If True, will remove the retrieved value and clean up the dict. 

126 

127 """ 

128 key_path = [key] if isinstance(key, str) else key 

129 

130 curr = self._data 

131 seq = [curr] 

132 for k in key_path: 

133 curr = curr.get(k) 

134 curr = curr[0] if isinstance(curr, list) and only_first else curr 

135 seq.append(curr) 

136 if curr is None: 

137 return None 

138 

139 if remove: 

140 seq.pop() 

141 del seq[-1][key_path[-1]] # remove leaf value 

142 # clean up the tree 

143 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))): 

144 if not dct.get(key): 

145 del dct[key] 

146 

147 if isinstance(curr, list) and only_first: 

148 return curr[0] 

149 return curr 

150 

151 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None: 

152 """Set a property in the data. 

153 

154 Note if there are lists along the path, they are cleared out. 

155 

156 Override this to e.g. rewrite the retrieved key 

157 (e.g. if everything relevant is in some subobject). 

158 """ 

159 if isinstance(key, IgnoreKey): 

160 return 

161 key_path = [key] if isinstance(key, str) else key 

162 

163 if not value: # remove value and clean up the sub-dict 

164 self._get_property(key_path, remove=True) 

165 return 

166 

167 # create path on the fly if needed 

168 curr = self._data 

169 for key in key_path[:-1]: 

170 if key not in curr: 

171 curr[key] = {} 

172 curr = curr[key] 

173 

174 curr[key_path[-1]] = value 

175 

176 # ---- 

177 # special handling for person metadata 

178 

179 def _merge_person_metadata( 

180 self, old: List[Union[Person, Entity]], new: List[Union[Person, Entity]] 

181 ) -> List[Union[Person, Entity]]: 

182 """Update metadata of a list of persons. 

183 

184 Will identify people based on orcid, email or full name. 

185 

186 If old list has same person listed multiple times, 

187 the resulting list will too (we cannot correctly merge for external formats.) 

188 """ 

189 new_people = [] # list for new people (e.g. added authors) 

190 # flag, meaning "person was not removed" 

191 still_exists = [False for i in range(len(old))] 

192 # copies of old person data, to be modified 

193 modified_people = [p.model_copy() for p in old] 

194 

195 # try to match new people to existing old ones 

196 # (inefficient, but author list are not that long usually) 

197 for person_meta in new: 

198 person_update = person_meta.model_dump() 

199 person_existed = False 

200 for i in range(len(modified_people)): 

201 person = modified_people[i] 

202 if not person.same_person(person_meta): 

203 continue 

204 

205 # not new person (-> will not append new record) 

206 person_existed = True 

207 # still exists (-> will not be removed from list) 

208 still_exists[i] = True 

209 

210 # if there were changes -> update person 

211 overlapping_fields = person.model_dump( 

212 include=set(person_update.keys()) 

213 ) 

214 if person_update != overlapping_fields: 

215 modified_people[i] = person.model_copy(update=person_update) 

216 

217 # show effective update in debug log 

218 old_fmt = self._from_person(person) 

219 new_fmt = self._from_person(modified_people[i]) 

220 if old_fmt != new_fmt: 

221 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}") 

222 

223 if not person_existed: 

224 new_people.append(person_meta) 

225 

226 # show added and removed people in debug log 

227 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]] 

228 for person in removed_people: 

229 logger.debug(f"Removing person\n{self._from_person(person)}") 

230 for person in new_people: 

231 logger.debug(f"Adding person\n{self._from_person(person)}") 

232 

233 # return updated list of (still existing) people, 

234 # and all new people coming after them. 

235 existing_modified = [ 

236 modified_people[i] for i in range(len(old)) if still_exists[i] 

237 ] 

238 return existing_modified + new_people 

239 

240 def _sync_person_list( 

241 self, old: List[Any], new: List[Union[Person, Entity]] 

242 ) -> List[Any]: 

243 """Sync a list of persons with new metadata. 

244 

245 Args: 

246 old (List[Any]): list of persons in format-specific representation 

247 new (List[Person]): list of persons in somesy representation 

248 

249 Returns: 

250 List[Any]: updated list of persons in format-specific representation 

251 

252 """ 

253 old_people: List[Union[Person, Entity]] = self._parse_people(old) 

254 if old_people is None or len(old_people) == 0: 

255 return new 

256 if new is None or len(new) == 0: 

257 return old_people 

258 return self._merge_person_metadata(old_people, new) 

259 

260 def _sync_authors(self, metadata: ProjectMetadata) -> None: 

261 """Sync output file authors with authors from metadata. 

262 

263 This method is existing for the publication_author special case 

264 when synchronizing to CITATION.cff. 

265 """ 

266 if self.authors is None or len(self.authors) == 0: 

267 self.authors = metadata.authors() 

268 else: 

269 self.authors = self._sync_person_list(self.authors, metadata.authors()) 

270 

271 def sync(self, metadata: ProjectMetadata) -> None: 

272 """Sync output file with other metadata files.""" 

273 self.name = metadata.name 

274 self.description = metadata.description 

275 

276 if metadata.version: 

277 self.version = metadata.version 

278 

279 if metadata.keywords: 

280 self.keywords = metadata.keywords 

281 

282 self._sync_authors(metadata) 

283 self.maintainers = self._sync_person_list( 

284 self.maintainers, metadata.maintainers() 

285 ) 

286 

287 self.license = metadata.license.value 

288 

289 self.homepage = str(metadata.homepage) if metadata.homepage else None 

290 self.repository = str(metadata.repository) if metadata.repository else None 

291 self.documentation = ( 

292 str(metadata.documentation) if metadata.documentation else None 

293 ) 

294 

295 @staticmethod 

296 @abstractmethod 

297 def _from_person(person: Union[Person, Entity]) -> Any: 

298 """Convert a `Person` or `Entity` object into suitable target format.""" 

299 

300 @staticmethod 

301 @abstractmethod 

302 def _to_person(person_obj: Any) -> Union[Person, Entity]: 

303 """Convert an object representing a person into a `Person` or `Entity` object.""" 

304 

305 @classmethod 

306 def _parse_people(cls, people: Optional[List[Any]]) -> List[Union[Person, Entity]]: 

307 """Return a list of Persons and Entities parsed from list of format-specific people representations.""" 

308 # remove None values 

309 people = [p for p in people if p is not None] 

310 

311 people = list(map(lambda p: cls._to_person(p), people or [])) 

312 return people 

313 

314 # ---- 

315 # individual magic getters and setters 

316 

317 def _get_key(self, key): 

318 """Get a key itself.""" 

319 return self.direct_mappings.get(key) or key 

320 

321 @property 

322 def name(self): 

323 """Return the name of the project.""" 

324 return self._get_property(self._get_key("name")) 

325 

326 @name.setter 

327 def name(self, name: str) -> None: 

328 """Set the name of the project.""" 

329 self._set_property(self._get_key("name"), name) 

330 

331 @property 

332 def version(self) -> Optional[str]: 

333 """Return the version of the project.""" 

334 return self._get_property(self._get_key("version")) 

335 

336 @version.setter 

337 def version(self, version: str) -> None: 

338 """Set the version of the project.""" 

339 self._set_property(self._get_key("version"), version) 

340 

341 @property 

342 def description(self) -> Optional[str]: 

343 """Return the description of the project.""" 

344 return self._get_property(self._get_key("description")) 

345 

346 @description.setter 

347 def description(self, description: str) -> None: 

348 """Set the description of the project.""" 

349 self._set_property(self._get_key("description"), description) 

350 

351 @property 

352 def authors(self): 

353 """Return the authors of the project.""" 

354 authors = self._get_property(self._get_key("authors")) 

355 if authors is None or len(authors) == 0: 

356 return [] 

357 

358 # only return authors that can be converted to Person 

359 authors_validated = [ 

360 author for author in authors if self._to_person(author) is not None 

361 ] 

362 return authors_validated 

363 

364 @authors.setter 

365 def authors(self, authors: List[Union[Person, Entity]]) -> None: 

366 """Set the authors of the project.""" 

367 authors = [self._from_person(c) for c in authors] 

368 self._set_property(self._get_key("authors"), authors) 

369 

370 @property 

371 def maintainers(self): 

372 """Return the maintainers of the project.""" 

373 maintainers = self._get_property(self._get_key("maintainers")) 

374 if maintainers is None: 

375 return [] 

376 

377 # only return maintainers that can be converted to Person 

378 maintainers_validated = [ 

379 maintainer 

380 for maintainer in maintainers 

381 if self._to_person(maintainer) is not None 

382 ] 

383 return maintainers_validated 

384 

385 @maintainers.setter 

386 def maintainers(self, maintainers: List[Union[Person, Entity]]) -> None: 

387 """Set the maintainers of the project.""" 

388 maintainers = [self._from_person(c) for c in maintainers] 

389 self._set_property(self._get_key("maintainers"), maintainers) 

390 

391 @property 

392 def contributors(self): 

393 """Return the contributors of the project.""" 

394 return self._get_property(self._get_key("contributors")) 

395 

396 @contributors.setter 

397 def contributors(self, contributors: List[Union[Person, Entity]]) -> None: 

398 """Set the contributors of the project.""" 

399 contributors = [self._from_person(c) for c in contributors] 

400 self._set_property(self._get_key("contributors"), contributors) 

401 

402 @property 

403 def keywords(self) -> Optional[List[str]]: 

404 """Return the keywords of the project.""" 

405 return self._get_property(self._get_key("keywords")) 

406 

407 @keywords.setter 

408 def keywords(self, keywords: List[str]) -> None: 

409 """Set the keywords of the project.""" 

410 self._set_property(self._get_key("keywords"), keywords) 

411 

412 @property 

413 def license(self) -> Optional[str]: 

414 """Return the license of the project.""" 

415 return self._get_property(self._get_key("license")) 

416 

417 @license.setter 

418 def license(self, license: Optional[str]) -> None: 

419 """Set the license of the project.""" 

420 self._set_property(self._get_key("license"), license) 

421 

422 @property 

423 def homepage(self) -> Optional[str]: 

424 """Return the homepage url of the project.""" 

425 return self._get_property(self._get_key("homepage")) 

426 

427 @homepage.setter 

428 def homepage(self, value: Optional[str]) -> None: 

429 """Set the homepage url of the project.""" 

430 self._set_property(self._get_key("homepage"), value) 

431 

432 @property 

433 def repository(self) -> Optional[Union[str, dict]]: 

434 """Return the repository url of the project.""" 

435 return self._get_property(self._get_key("repository")) 

436 

437 @repository.setter 

438 def repository(self, value: Optional[Union[str, dict]]) -> None: 

439 """Set the repository url of the project.""" 

440 self._set_property(self._get_key("repository"), value) 

441 

442 @property 

443 def documentation(self) -> Optional[Union[str, dict]]: 

444 """Return the documentation url of the project.""" 

445 return self._get_property(self._get_key("documentation")) 

446 

447 @documentation.setter 

448 def documentation(self, value: Optional[Union[str, dict]]) -> None: 

449 """Set the documentation url of the project.""" 

450 self._set_property(self._get_key("documentation"), value)