Coverage for src/somesy/core/writer.py: 96%
204 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-14 13:02 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-14 13:02 +0000
1"""Project metadata writer base-class."""
3import logging
4from abc import ABC, abstractmethod
5from pathlib import Path
6from typing import Any, Dict, List, Optional, Union
8from somesy.core.models import Entity, Person, ProjectMetadata
10logger = logging.getLogger("somesy")
13class IgnoreKey:
14 """Special marker to be passed for dropping a key from serialization."""
17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]]
18"""Type to be used for the dict passed as `direct_mappings`."""
20DictLike = Any
21"""Dict-like that supports getitem, setitem, delitem, etc.
23NOTE: This should be probably turned into a proper protocol.
24"""
27class ProjectMetadataWriter(ABC):
28 """Base class for Project Metadata Output Wrapper.
30 All supported output formats are implemented as subclasses.
31 """
33 def __init__(
34 self,
35 path: Path,
36 *,
37 create_if_not_exists: Optional[bool] = False,
38 direct_mappings: FieldKeyMapping = None,
39 merge: Optional[bool] = False,
40 pass_validation: Optional[bool] = False,
41 ) -> None:
42 """Initialize the Project Metadata Output Wrapper.
44 Use the `direct_mappings` dict to define
45 format-specific location for certain fields,
46 if no additional processing is needed that
47 requires a customized setter.
49 Args:
50 path: Path to target output file.
51 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True.
52 direct_mappings: Dict with direct mappings of keys between somesy and target
53 merge: Merge the output file with an existing file. Defaults to False.
54 pass_validation: Pass validation for all output files. Defaults to False.
56 """
57 self._data: DictLike = {}
58 self.path = path if isinstance(path, Path) else Path(path)
59 self.create_if_not_exists = create_if_not_exists
60 self.direct_mappings = direct_mappings or {}
61 self.merge = merge
62 self.pass_validation = pass_validation
63 if self.path.is_file():
64 self._load()
65 if not self.pass_validation:
66 self._validate()
67 else:
68 if self.create_if_not_exists:
69 self._init_new_file()
70 self._load()
71 else:
72 raise FileNotFoundError(f"The file {self.path} does not exist.")
74 def _init_new_file(self) -> None:
75 """Create an new suitable target file.
77 Override to initialize file with minimal contents, if needed.
78 Make sure to set `self._data` to match the contents.
79 """
80 self.path.touch()
82 @abstractmethod
83 def _load(self):
84 """Load the output file and validate it.
86 Implement this method so that it loads the file `self.path`
87 into the `self._data` dict.
89 The file is guaranteed to exist.
90 """
92 @abstractmethod
93 def _validate(self) -> None:
94 """Validate the target file data.
96 Implement this method so that it checks
97 the validity of the metadata (relevant to somesy)
98 in that file and raises exceptions on failure.
99 """
101 @abstractmethod
102 def save(self, path: Optional[Path]) -> None:
103 """Save the output file to the given path.
105 Implement this in a way that will carefully
106 update the target file with new metadata
107 without destroying its other contents or structure.
108 """
110 def _get_property(
111 self,
112 key: Union[str, List[str]],
113 *,
114 only_first: bool = False,
115 remove: bool = False,
116 ) -> Optional[Any]:
117 """Get a property from the data.
119 Override this to e.g. rewrite the retrieved key
120 (e.g. if everything relevant is in some subobject).
122 Args:
123 key: Name of the key or sequence of multiple keys to retrieve the value.
124 only_first: If True, returns only first entry if the value is a list.
125 remove: If True, will remove the retrieved value and clean up the dict.
127 """
128 key_path = [key] if isinstance(key, str) else key
130 curr = self._data
131 seq = [curr]
132 for k in key_path:
133 curr = curr.get(k)
134 curr = curr[0] if isinstance(curr, list) and only_first else curr
135 seq.append(curr)
136 if curr is None:
137 return None
139 if remove:
140 seq.pop()
141 del seq[-1][key_path[-1]] # remove leaf value
142 # clean up the tree
143 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))):
144 if not dct.get(key):
145 del dct[key]
147 if isinstance(curr, list) and only_first:
148 return curr[0]
149 return curr
151 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None:
152 """Set a property in the data.
154 Note if there are lists along the path, they are cleared out.
156 Override this to e.g. rewrite the retrieved key
157 (e.g. if everything relevant is in some subobject).
158 """
159 if isinstance(key, IgnoreKey):
160 return
161 key_path = [key] if isinstance(key, str) else key
163 if not value: # remove value and clean up the sub-dict
164 self._get_property(key_path, remove=True)
165 return
167 # create path on the fly if needed
168 curr = self._data
169 for key in key_path[:-1]:
170 if key not in curr:
171 curr[key] = {}
172 curr = curr[key]
174 curr[key_path[-1]] = value
176 # ----
177 # special handling for person metadata
179 def _merge_person_metadata(
180 self, old: List[Union[Person, Entity]], new: List[Union[Person, Entity]]
181 ) -> List[Union[Person, Entity]]:
182 """Update metadata of a list of persons.
184 Will identify people based on orcid, email or full name.
186 If old list has same person listed multiple times,
187 the resulting list will too (we cannot correctly merge for external formats.)
188 """
189 new_people = [] # list for new people (e.g. added authors)
190 # flag, meaning "person was not removed"
191 still_exists = [False for i in range(len(old))]
192 # copies of old person data, to be modified
193 modified_people = [p.model_copy() for p in old]
195 # try to match new people to existing old ones
196 # (inefficient, but author list are not that long usually)
197 for person_meta in new:
198 person_update = person_meta.model_dump()
199 person_existed = False
200 for i in range(len(modified_people)):
201 person = modified_people[i]
202 if not person.same_person(person_meta):
203 continue
205 # not new person (-> will not append new record)
206 person_existed = True
207 # still exists (-> will not be removed from list)
208 still_exists[i] = True
210 # if there were changes -> update person
211 overlapping_fields = person.model_dump(
212 include=set(person_update.keys())
213 )
214 if person_update != overlapping_fields:
215 modified_people[i] = person.model_copy(update=person_update)
217 # show effective update in debug log
218 old_fmt = self._from_person(person)
219 new_fmt = self._from_person(modified_people[i])
220 if old_fmt != new_fmt:
221 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}")
223 if not person_existed:
224 new_people.append(person_meta)
226 # show added and removed people in debug log
227 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]]
228 for person in removed_people:
229 logger.debug(f"Removing person\n{self._from_person(person)}")
230 for person in new_people:
231 logger.debug(f"Adding person\n{self._from_person(person)}")
233 # return updated list of (still existing) people,
234 # and all new people coming after them.
235 existing_modified = [
236 modified_people[i] for i in range(len(old)) if still_exists[i]
237 ]
238 return existing_modified + new_people
240 def _sync_person_list(
241 self, old: List[Any], new: List[Union[Person, Entity]]
242 ) -> List[Any]:
243 """Sync a list of persons with new metadata.
245 Args:
246 old (List[Any]): list of persons in format-specific representation
247 new (List[Person]): list of persons in somesy representation
249 Returns:
250 List[Any]: updated list of persons in format-specific representation
252 """
253 old_people: List[Union[Person, Entity]] = self._parse_people(old)
255 # check if people are unique
256 def filter_unique(
257 people: List[Union[Person, Entity]],
258 ) -> List[Union[Person, Entity]]:
259 """Filter out duplicate people from a list."""
260 if people is None or len(people) == 0:
261 return []
263 unique_people: List[Union[Person, Entity]] = []
264 # use same_person method to check if people are unique
265 for person in people:
266 if not any(person.same_person(p) for p in unique_people):
267 unique_people.append(person)
269 return unique_people
271 old_people_unique = filter_unique(old_people)
272 new_people_unique = filter_unique(new)
274 return self._merge_person_metadata(old_people_unique, new_people_unique)
276 def _sync_authors(self, metadata: ProjectMetadata) -> None:
277 """Sync output file authors with authors from metadata.
279 This method is existing for the publication_author special case
280 when synchronizing to CITATION.cff.
281 """
282 if self.authors is None or len(self.authors) == 0:
283 self.authors = metadata.authors()
284 else:
285 self.authors = self._sync_person_list(self.authors, metadata.authors())
287 def sync(self, metadata: ProjectMetadata) -> None:
288 """Sync output file with other metadata files."""
289 self.name = metadata.name
290 self.description = metadata.description
292 if metadata.version:
293 self.version = metadata.version
295 if metadata.keywords:
296 self.keywords = metadata.keywords
298 self._sync_authors(metadata)
299 self.maintainers = self._sync_person_list(
300 self.maintainers, metadata.maintainers()
301 )
303 self.license = metadata.license.value
305 self.homepage = str(metadata.homepage) if metadata.homepage else None
306 self.repository = str(metadata.repository) if metadata.repository else None
307 self.documentation = (
308 str(metadata.documentation) if metadata.documentation else None
309 )
311 @staticmethod
312 @abstractmethod
313 def _from_person(person: Union[Person, Entity]) -> Any:
314 """Convert a `Person` or `Entity` object into suitable target format."""
316 @staticmethod
317 @abstractmethod
318 def _to_person(person_obj: Any) -> Union[Person, Entity]:
319 """Convert an object representing a person into a `Person` or `Entity` object."""
321 @classmethod
322 def _parse_people(cls, people: Optional[List[Any]]) -> List[Union[Person, Entity]]:
323 """Return a list of Persons and Entities parsed from list of format-specific people representations."""
324 # remove None values
325 people = [p for p in people if p is not None]
327 people = list(map(lambda p: cls._to_person(p), people or []))
328 return people
330 # ----
331 # individual magic getters and setters
333 def _get_key(self, key):
334 """Get a key itself."""
335 return self.direct_mappings.get(key) or key
337 @property
338 def name(self):
339 """Return the name of the project."""
340 return self._get_property(self._get_key("name"))
342 @name.setter
343 def name(self, name: str) -> None:
344 """Set the name of the project."""
345 self._set_property(self._get_key("name"), name)
347 @property
348 def version(self) -> Optional[str]:
349 """Return the version of the project."""
350 return self._get_property(self._get_key("version"))
352 @version.setter
353 def version(self, version: str) -> None:
354 """Set the version of the project."""
355 self._set_property(self._get_key("version"), version)
357 @property
358 def description(self) -> Optional[str]:
359 """Return the description of the project."""
360 return self._get_property(self._get_key("description"))
362 @description.setter
363 def description(self, description: str) -> None:
364 """Set the description of the project."""
365 self._set_property(self._get_key("description"), description)
367 @property
368 def authors(self):
369 """Return the authors of the project."""
370 authors = self._get_property(self._get_key("authors"))
371 if authors is None or len(authors) == 0:
372 return []
374 # only return authors that can be converted to Person
375 authors_validated = [
376 author for author in authors if self._to_person(author) is not None
377 ]
378 return authors_validated
380 @authors.setter
381 def authors(self, authors: List[Union[Person, Entity]]) -> None:
382 """Set the authors of the project."""
383 authors = [self._from_person(c) for c in authors]
384 self._set_property(self._get_key("authors"), authors)
386 @property
387 def maintainers(self):
388 """Return the maintainers of the project."""
389 maintainers = self._get_property(self._get_key("maintainers"))
390 if maintainers is None:
391 return []
393 # only return maintainers that can be converted to Person
394 maintainers_validated = [
395 maintainer
396 for maintainer in maintainers
397 if self._to_person(maintainer) is not None
398 ]
399 return maintainers_validated
401 @maintainers.setter
402 def maintainers(self, maintainers: List[Union[Person, Entity]]) -> None:
403 """Set the maintainers of the project."""
404 maintainers = [self._from_person(c) for c in maintainers]
405 self._set_property(self._get_key("maintainers"), maintainers)
407 @property
408 def contributors(self):
409 """Return the contributors of the project."""
410 return self._get_property(self._get_key("contributors"))
412 @contributors.setter
413 def contributors(self, contributors: List[Union[Person, Entity]]) -> None:
414 """Set the contributors of the project."""
415 contributors = [self._from_person(c) for c in contributors]
416 self._set_property(self._get_key("contributors"), contributors)
418 @property
419 def keywords(self) -> Optional[List[str]]:
420 """Return the keywords of the project."""
421 return self._get_property(self._get_key("keywords"))
423 @keywords.setter
424 def keywords(self, keywords: List[str]) -> None:
425 """Set the keywords of the project."""
426 self._set_property(self._get_key("keywords"), keywords)
428 @property
429 def license(self) -> Optional[str]:
430 """Return the license of the project."""
431 return self._get_property(self._get_key("license"))
433 @license.setter
434 def license(self, license: Optional[str]) -> None:
435 """Set the license of the project."""
436 self._set_property(self._get_key("license"), license)
438 @property
439 def homepage(self) -> Optional[str]:
440 """Return the homepage url of the project."""
441 return self._get_property(self._get_key("homepage"))
443 @homepage.setter
444 def homepage(self, value: Optional[str]) -> None:
445 """Set the homepage url of the project."""
446 self._set_property(self._get_key("homepage"), value)
448 @property
449 def repository(self) -> Optional[Union[str, dict]]:
450 """Return the repository url of the project."""
451 return self._get_property(self._get_key("repository"))
453 @repository.setter
454 def repository(self, value: Optional[Union[str, dict]]) -> None:
455 """Set the repository url of the project."""
456 self._set_property(self._get_key("repository"), value)
458 @property
459 def documentation(self) -> Optional[Union[str, dict]]:
460 """Return the documentation url of the project."""
461 return self._get_property(self._get_key("documentation"))
463 @documentation.setter
464 def documentation(self, value: Optional[Union[str, dict]]) -> None:
465 """Set the documentation url of the project."""
466 self._set_property(self._get_key("documentation"), value)