Coverage for src/somesy/core/writer.py: 95%

187 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-29 07:42 +0000

1"""Project metadata writer base-class.""" 

2 

3import logging 

4from abc import ABC, abstractmethod 

5from pathlib import Path 

6from typing import Any, Dict, List, Optional, Union 

7 

8from somesy.core.models import Person, ProjectMetadata 

9 

10logger = logging.getLogger("somesy") 

11 

12 

13class IgnoreKey: 

14 """Special marker to be passed for dropping a key from serialization.""" 

15 

16 

17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]] 

18"""Type to be used for the dict passed as `direct_mappings`.""" 

19 

20DictLike = Any 

21"""Dict-like that supports getitem, setitem, delitem, etc. 

22 

23NOTE: This should be probably turned into a proper protocol. 

24""" 

25 

26 

27class ProjectMetadataWriter(ABC): 

28 """Base class for Project Metadata Output Wrapper. 

29 

30 All supported output formats are implemented as subclasses. 

31 """ 

32 

33 def __init__( 

34 self, 

35 path: Path, 

36 *, 

37 create_if_not_exists: Optional[bool] = False, 

38 direct_mappings: FieldKeyMapping = None, 

39 ) -> None: 

40 """Initialize the Project Metadata Output Wrapper. 

41 

42 Use the `direct_mappings` dict to define 

43 format-specific location for certain fields, 

44 if no additional processing is needed that 

45 requires a customized setter. 

46 

47 Args: 

48 path: Path to target output file. 

49 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True. 

50 direct_mappings: Dict with direct mappings of keys between somesy and target 

51 

52 """ 

53 self._data: DictLike = {} 

54 self.path = path if isinstance(path, Path) else Path(path) 

55 self.create_if_not_exists = create_if_not_exists 

56 self.direct_mappings = direct_mappings or {} 

57 

58 if self.path.is_file(): 

59 self._load() 

60 self._validate() 

61 else: 

62 if self.create_if_not_exists: 

63 self._init_new_file() 

64 self._load() 

65 else: 

66 raise FileNotFoundError(f"The file {self.path} does not exist.") 

67 

68 def _init_new_file(self) -> None: 

69 """Create an new suitable target file. 

70 

71 Override to initialize file with minimal contents, if needed. 

72 Make sure to set `self._data` to match the contents. 

73 """ 

74 self.path.touch() 

75 

76 @abstractmethod 

77 def _load(self): 

78 """Load the output file and validate it. 

79 

80 Implement this method so that it loads the file `self.path` 

81 into the `self._data` dict. 

82 

83 The file is guaranteed to exist. 

84 """ 

85 

86 @abstractmethod 

87 def _validate(self): 

88 """Validate the target file data. 

89 

90 Implement this method so that it checks 

91 the validity of the metadata (relevant to somesy) 

92 in that file and raises exceptions on failure. 

93 """ 

94 

95 @abstractmethod 

96 def save(self, path: Optional[Path]) -> None: 

97 """Save the output file to the given path. 

98 

99 Implement this in a way that will carefully 

100 update the target file with new metadata 

101 without destroying its other contents or structure. 

102 """ 

103 

104 def _get_property( 

105 self, 

106 key: Union[str, List[str]], 

107 *, 

108 only_first: bool = False, 

109 remove: bool = False, 

110 ) -> Optional[Any]: 

111 """Get a property from the data. 

112 

113 Override this to e.g. rewrite the retrieved key 

114 (e.g. if everything relevant is in some subobject). 

115 

116 Args: 

117 key: Name of the key or sequence of multiple keys to retrieve the value. 

118 only_first: If True, returns only first entry if the value is a list. 

119 remove: If True, will remove the retrieved value and clean up the dict. 

120 

121 """ 

122 key_path = [key] if isinstance(key, str) else key 

123 

124 curr = self._data 

125 seq = [curr] 

126 for k in key_path: 

127 curr = curr.get(k) 

128 curr = curr[0] if isinstance(curr, list) and only_first else curr 

129 seq.append(curr) 

130 if curr is None: 

131 return None 

132 

133 if remove: 

134 seq.pop() 

135 del seq[-1][key_path[-1]] # remove leaf value 

136 # clean up the tree 

137 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))): 

138 if not dct.get(key): 

139 del dct[key] 

140 

141 if isinstance(curr, list) and only_first: 

142 return curr[0] 

143 return curr 

144 

145 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None: 

146 """Set a property in the data. 

147 

148 Note if there are lists along the path, they are cleared out. 

149 

150 Override this to e.g. rewrite the retrieved key 

151 (e.g. if everything relevant is in some subobject). 

152 """ 

153 if isinstance(key, IgnoreKey): 

154 return 

155 key_path = [key] if isinstance(key, str) else key 

156 

157 if not value: # remove value and clean up the sub-dict 

158 self._get_property(key_path, remove=True) 

159 return 

160 

161 # create path on the fly if needed 

162 curr = self._data 

163 for key in key_path[:-1]: 

164 if key not in curr: 

165 curr[key] = {} 

166 curr = curr[key] 

167 

168 curr[key_path[-1]] = value 

169 

170 # ---- 

171 # special handling for person metadata 

172 

173 def _merge_person_metadata( 

174 self, old: List[Person], new: List[Person] 

175 ) -> List[Person]: 

176 """Update metadata of a list of persons. 

177 

178 Will identify people based on orcid, email or full name. 

179 

180 If old list has same person listed multiple times, 

181 the resulting list will too (we cannot correctly merge for external formats.) 

182 """ 

183 new_people = [] # list for new people (e.g. added authors) 

184 # flag, meaning "person was not removed" 

185 still_exists = [False for i in range(len(old))] 

186 # copies of old person data, to be modified 

187 modified_people = [p.model_copy() for p in old] 

188 

189 # try to match new people to existing old ones 

190 # (inefficient, but author list are not that long usually) 

191 for person_meta in new: 

192 person_update = person_meta.model_dump() 

193 person_existed = False 

194 for i in range(len(modified_people)): 

195 person = modified_people[i] 

196 if not person.same_person(person_meta): 

197 continue 

198 

199 # not new person (-> will not append new record) 

200 person_existed = True 

201 # still exists (-> will not be removed from list) 

202 still_exists[i] = True 

203 

204 # if there were changes -> update person 

205 overlapping_fields = person.model_dump( 

206 include=set(person_update.keys()) 

207 ) 

208 if person_update != overlapping_fields: 

209 modified_people[i] = person.model_copy(update=person_update) 

210 

211 # show effective update in debug log 

212 old_fmt = self._from_person(person) 

213 new_fmt = self._from_person(modified_people[i]) 

214 if old_fmt != new_fmt: 

215 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}") 

216 

217 if not person_existed: 

218 new_people.append(person_meta) 

219 

220 # show added and removed people in debug log 

221 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]] 

222 for person in removed_people: 

223 logger.debug(f"Removing person\n{self._from_person(person)}") 

224 for person in new_people: 

225 logger.debug(f"Adding person\n{self._from_person(person)}") 

226 

227 # return updated list of (still existing) people, 

228 # and all new people coming after them. 

229 existing_modified = [ 

230 modified_people[i] for i in range(len(old)) if still_exists[i] 

231 ] 

232 return existing_modified + new_people 

233 

234 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]: 

235 """Sync a list of persons with new metadata. 

236 

237 Args: 

238 old (List[Any]): list of persons in format-specific representation 

239 new (List[Person]): list of persons in somesy representation 

240 

241 Returns: 

242 List[Any]: updated list of persons in format-specific representation 

243 

244 """ 

245 old_people: List[Person] = self._parse_people(old) 

246 return self._merge_person_metadata(old_people, new) 

247 

248 def _sync_authors(self, metadata: ProjectMetadata) -> None: 

249 """Sync output file authors with authors from metadata. 

250 

251 This method is existing for the publication_author special case 

252 when synchronizing to CITATION.cff. 

253 """ 

254 self.authors = self._sync_person_list(self.authors, metadata.authors()) 

255 

256 def sync(self, metadata: ProjectMetadata) -> None: 

257 """Sync output file with other metadata files.""" 

258 self.name = metadata.name 

259 self.description = metadata.description 

260 

261 if metadata.version: 

262 self.version = metadata.version 

263 

264 if metadata.keywords: 

265 self.keywords = metadata.keywords 

266 

267 self._sync_authors(metadata) 

268 self.maintainers = self._sync_person_list( 

269 self.maintainers, metadata.maintainers() 

270 ) 

271 

272 self.license = metadata.license.value 

273 

274 self.homepage = str(metadata.homepage) if metadata.homepage else None 

275 self.repository = str(metadata.repository) if metadata.repository else None 

276 self.documentation = ( 

277 str(metadata.documentation) if metadata.documentation else None 

278 ) 

279 

280 @staticmethod 

281 @abstractmethod 

282 def _from_person(person: Person) -> Any: 

283 """Convert a `Person` object into suitable target format.""" 

284 

285 @staticmethod 

286 @abstractmethod 

287 def _to_person(person_obj: Any) -> Person: 

288 """Convert an object representing a person into a `Person` object.""" 

289 

290 @classmethod 

291 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]: 

292 """Return a list of Persons parsed from list of format-specific people representations.""" 

293 return list(map(cls._to_person, people or [])) 

294 

295 # ---- 

296 # individual magic getters and setters 

297 

298 def _get_key(self, key): 

299 return self.direct_mappings.get(key) or key 

300 

301 @property 

302 def name(self): 

303 """Return the name of the project.""" 

304 return self._get_property(self._get_key("name")) 

305 

306 @name.setter 

307 def name(self, name: str) -> None: 

308 """Set the name of the project.""" 

309 self._set_property(self._get_key("name"), name) 

310 

311 @property 

312 def version(self) -> Optional[str]: 

313 """Return the version of the project.""" 

314 return self._get_property(self._get_key("version")) 

315 

316 @version.setter 

317 def version(self, version: str) -> None: 

318 """Set the version of the project.""" 

319 self._set_property(self._get_key("version"), version) 

320 

321 @property 

322 def description(self) -> Optional[str]: 

323 """Return the description of the project.""" 

324 return self._get_property(self._get_key("description")) 

325 

326 @description.setter 

327 def description(self, description: str) -> None: 

328 """Set the description of the project.""" 

329 self._set_property(self._get_key("description"), description) 

330 

331 @property 

332 def authors(self): 

333 """Return the authors of the project.""" 

334 authors = self._get_property(self._get_key("authors")) 

335 if authors is None: 

336 return [] 

337 

338 # only return authors that can be converted to Person 

339 authors_validated = [ 

340 author for author in authors if self._to_person(author) is not None 

341 ] 

342 return authors_validated 

343 

344 @authors.setter 

345 def authors(self, authors: List[Person]) -> None: 

346 """Set the authors of the project.""" 

347 authors = [self._from_person(c) for c in authors] 

348 self._set_property(self._get_key("authors"), authors) 

349 

350 @property 

351 def maintainers(self): 

352 """Return the maintainers of the project.""" 

353 maintainers = self._get_property(self._get_key("maintainers")) 

354 if maintainers is None: 

355 return [] 

356 

357 # only return maintainers that can be converted to Person 

358 maintainers_validated = [ 

359 maintainer 

360 for maintainer in maintainers 

361 if self._to_person(maintainer) is not None 

362 ] 

363 return maintainers_validated 

364 

365 @maintainers.setter 

366 def maintainers(self, maintainers: List[Person]) -> None: 

367 """Set the maintainers of the project.""" 

368 maintainers = [self._from_person(c) for c in maintainers] 

369 self._set_property(self._get_key("maintainers"), maintainers) 

370 

371 @property 

372 def contributors(self): 

373 """Return the contributors of the project.""" 

374 return self._get_property(self._get_key("contributors")) 

375 

376 @contributors.setter 

377 def contributors(self, contributors: List[Person]) -> None: 

378 """Set the contributors of the project.""" 

379 contributors = [self._from_person(c) for c in contributors] 

380 self._set_property(self._get_key("contributors"), contributors) 

381 

382 @property 

383 def keywords(self) -> Optional[List[str]]: 

384 """Return the keywords of the project.""" 

385 return self._get_property(self._get_key("keywords")) 

386 

387 @keywords.setter 

388 def keywords(self, keywords: List[str]) -> None: 

389 """Set the keywords of the project.""" 

390 self._set_property(self._get_key("keywords"), keywords) 

391 

392 @property 

393 def license(self) -> Optional[str]: 

394 """Return the license of the project.""" 

395 return self._get_property(self._get_key("license")) 

396 

397 @license.setter 

398 def license(self, license: Optional[str]) -> None: 

399 """Set the license of the project.""" 

400 self._set_property(self._get_key("license"), license) 

401 

402 @property 

403 def homepage(self) -> Optional[str]: 

404 """Return the homepage url of the project.""" 

405 return self._get_property(self._get_key("homepage")) 

406 

407 @homepage.setter 

408 def homepage(self, value: Optional[str]) -> None: 

409 """Set the homepage url of the project.""" 

410 self._set_property(self._get_key("homepage"), value) 

411 

412 @property 

413 def repository(self) -> Optional[Union[str, dict]]: 

414 """Return the repository url of the project.""" 

415 return self._get_property(self._get_key("repository")) 

416 

417 @repository.setter 

418 def repository(self, value: Optional[Union[str, dict]]) -> None: 

419 """Set the repository url of the project.""" 

420 self._set_property(self._get_key("repository"), value) 

421 

422 @property 

423 def documentation(self) -> Optional[Union[str, dict]]: 

424 """Return the documentation url of the project.""" 

425 return self._get_property(self._get_key("documentation")) 

426 

427 @documentation.setter 

428 def documentation(self, value: Optional[Union[str, dict]]) -> None: 

429 """Set the documentation url of the project.""" 

430 self._set_property(self._get_key("documentation"), value)