Coverage for src/somesy/core/writer.py: 95%
198 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-10 14:56 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-10 14:56 +0000
1"""Project metadata writer base-class."""
3import logging
4from abc import ABC, abstractmethod
5from pathlib import Path
6from typing import Any, Dict, List, Optional, Union
8from somesy.core.models import Entity, Person, ProjectMetadata
10logger = logging.getLogger("somesy")
13class IgnoreKey:
14 """Special marker to be passed for dropping a key from serialization."""
17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]]
18"""Type to be used for the dict passed as `direct_mappings`."""
20DictLike = Any
21"""Dict-like that supports getitem, setitem, delitem, etc.
23NOTE: This should be probably turned into a proper protocol.
24"""
27class ProjectMetadataWriter(ABC):
28 """Base class for Project Metadata Output Wrapper.
30 All supported output formats are implemented as subclasses.
31 """
33 def __init__(
34 self,
35 path: Path,
36 *,
37 create_if_not_exists: Optional[bool] = False,
38 direct_mappings: FieldKeyMapping = None,
39 merge: Optional[bool] = False,
40 pass_validation: Optional[bool] = False,
41 ) -> None:
42 """Initialize the Project Metadata Output Wrapper.
44 Use the `direct_mappings` dict to define
45 format-specific location for certain fields,
46 if no additional processing is needed that
47 requires a customized setter.
49 Args:
50 path: Path to target output file.
51 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True.
52 direct_mappings: Dict with direct mappings of keys between somesy and target
53 merge: Merge the output file with an existing file. Defaults to False.
54 pass_validation: Pass validation for all output files. Defaults to False.
56 """
57 self._data: DictLike = {}
58 self.path = path if isinstance(path, Path) else Path(path)
59 self.create_if_not_exists = create_if_not_exists
60 self.direct_mappings = direct_mappings or {}
61 self.merge = merge
62 self.pass_validation = pass_validation
63 if self.path.is_file():
64 self._load()
65 if not self.pass_validation:
66 self._validate()
67 else:
68 if self.create_if_not_exists:
69 self._init_new_file()
70 self._load()
71 else:
72 raise FileNotFoundError(f"The file {self.path} does not exist.")
74 def _init_new_file(self) -> None:
75 """Create an new suitable target file.
77 Override to initialize file with minimal contents, if needed.
78 Make sure to set `self._data` to match the contents.
79 """
80 self.path.touch()
82 @abstractmethod
83 def _load(self):
84 """Load the output file and validate it.
86 Implement this method so that it loads the file `self.path`
87 into the `self._data` dict.
89 The file is guaranteed to exist.
90 """
92 @abstractmethod
93 def _validate(self) -> None:
94 """Validate the target file data.
96 Implement this method so that it checks
97 the validity of the metadata (relevant to somesy)
98 in that file and raises exceptions on failure.
99 """
101 @abstractmethod
102 def save(self, path: Optional[Path]) -> None:
103 """Save the output file to the given path.
105 Implement this in a way that will carefully
106 update the target file with new metadata
107 without destroying its other contents or structure.
108 """
110 def _get_property(
111 self,
112 key: Union[str, List[str]],
113 *,
114 only_first: bool = False,
115 remove: bool = False,
116 ) -> Optional[Any]:
117 """Get a property from the data.
119 Override this to e.g. rewrite the retrieved key
120 (e.g. if everything relevant is in some subobject).
122 Args:
123 key: Name of the key or sequence of multiple keys to retrieve the value.
124 only_first: If True, returns only first entry if the value is a list.
125 remove: If True, will remove the retrieved value and clean up the dict.
127 """
128 key_path = [key] if isinstance(key, str) else key
130 curr = self._data
131 seq = [curr]
132 for k in key_path:
133 curr = curr.get(k)
134 curr = curr[0] if isinstance(curr, list) and only_first else curr
135 seq.append(curr)
136 if curr is None:
137 return None
139 if remove:
140 seq.pop()
141 del seq[-1][key_path[-1]] # remove leaf value
142 # clean up the tree
143 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))):
144 if not dct.get(key):
145 del dct[key]
147 if isinstance(curr, list) and only_first:
148 return curr[0]
149 return curr
151 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None:
152 """Set a property in the data.
154 Note if there are lists along the path, they are cleared out.
156 Override this to e.g. rewrite the retrieved key
157 (e.g. if everything relevant is in some subobject).
158 """
159 if isinstance(key, IgnoreKey):
160 return
161 key_path = [key] if isinstance(key, str) else key
163 if not value: # remove value and clean up the sub-dict
164 self._get_property(key_path, remove=True)
165 return
167 # create path on the fly if needed
168 curr = self._data
169 for key in key_path[:-1]:
170 if key not in curr:
171 curr[key] = {}
172 curr = curr[key]
174 curr[key_path[-1]] = value
176 # ----
177 # special handling for person metadata
179 def _merge_person_metadata(
180 self, old: List[Union[Person, Entity]], new: List[Union[Person, Entity]]
181 ) -> List[Union[Person, Entity]]:
182 """Update metadata of a list of persons.
184 Will identify people based on orcid, email or full name.
186 If old list has same person listed multiple times,
187 the resulting list will too (we cannot correctly merge for external formats.)
188 """
189 new_people = [] # list for new people (e.g. added authors)
190 # flag, meaning "person was not removed"
191 still_exists = [False for i in range(len(old))]
192 # copies of old person data, to be modified
193 modified_people = [p.model_copy() for p in old]
195 # try to match new people to existing old ones
196 # (inefficient, but author list are not that long usually)
197 for person_meta in new:
198 person_update = person_meta.model_dump()
199 person_existed = False
200 for i in range(len(modified_people)):
201 person = modified_people[i]
202 if not person.same_person(person_meta):
203 continue
205 # not new person (-> will not append new record)
206 person_existed = True
207 # still exists (-> will not be removed from list)
208 still_exists[i] = True
210 # if there were changes -> update person
211 overlapping_fields = person.model_dump(
212 include=set(person_update.keys())
213 )
214 if person_update != overlapping_fields:
215 modified_people[i] = person.model_copy(update=person_update)
217 # show effective update in debug log
218 old_fmt = self._from_person(person)
219 new_fmt = self._from_person(modified_people[i])
220 if old_fmt != new_fmt:
221 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}")
223 if not person_existed:
224 new_people.append(person_meta)
226 # show added and removed people in debug log
227 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]]
228 for person in removed_people:
229 logger.debug(f"Removing person\n{self._from_person(person)}")
230 for person in new_people:
231 logger.debug(f"Adding person\n{self._from_person(person)}")
233 # return updated list of (still existing) people,
234 # and all new people coming after them.
235 existing_modified = [
236 modified_people[i] for i in range(len(old)) if still_exists[i]
237 ]
238 return existing_modified + new_people
240 def _sync_person_list(
241 self, old: List[Any], new: List[Union[Person, Entity]]
242 ) -> List[Any]:
243 """Sync a list of persons with new metadata.
245 Args:
246 old (List[Any]): list of persons in format-specific representation
247 new (List[Person]): list of persons in somesy representation
249 Returns:
250 List[Any]: updated list of persons in format-specific representation
252 """
253 old_people: List[Union[Person, Entity]] = self._parse_people(old)
254 if old_people is None or len(old_people) == 0:
255 return new
256 if new is None or len(new) == 0:
257 return old_people
258 return self._merge_person_metadata(old_people, new)
260 def _sync_authors(self, metadata: ProjectMetadata) -> None:
261 """Sync output file authors with authors from metadata.
263 This method is existing for the publication_author special case
264 when synchronizing to CITATION.cff.
265 """
266 if self.authors is None or len(self.authors) == 0:
267 self.authors = metadata.authors()
268 else:
269 self.authors = self._sync_person_list(self.authors, metadata.authors())
271 def sync(self, metadata: ProjectMetadata) -> None:
272 """Sync output file with other metadata files."""
273 self.name = metadata.name
274 self.description = metadata.description
276 if metadata.version:
277 self.version = metadata.version
279 if metadata.keywords:
280 self.keywords = metadata.keywords
282 self._sync_authors(metadata)
283 self.maintainers = self._sync_person_list(
284 self.maintainers, metadata.maintainers()
285 )
287 self.license = metadata.license.value
289 self.homepage = str(metadata.homepage) if metadata.homepage else None
290 self.repository = str(metadata.repository) if metadata.repository else None
291 self.documentation = (
292 str(metadata.documentation) if metadata.documentation else None
293 )
295 @staticmethod
296 @abstractmethod
297 def _from_person(person: Union[Person, Entity]) -> Any:
298 """Convert a `Person` or `Entity` object into suitable target format."""
300 @staticmethod
301 @abstractmethod
302 def _to_person(person_obj: Any) -> Union[Person, Entity]:
303 """Convert an object representing a person into a `Person` or `Entity` object."""
305 @classmethod
306 def _parse_people(cls, people: Optional[List[Any]]) -> List[Union[Person, Entity]]:
307 """Return a list of Persons and Entities parsed from list of format-specific people representations."""
308 # remove None values
309 people = [p for p in people if p is not None]
311 people = list(map(lambda p: cls._to_person(p), people or []))
312 return people
314 # ----
315 # individual magic getters and setters
317 def _get_key(self, key):
318 """Get a key itself."""
319 return self.direct_mappings.get(key) or key
321 @property
322 def name(self):
323 """Return the name of the project."""
324 return self._get_property(self._get_key("name"))
326 @name.setter
327 def name(self, name: str) -> None:
328 """Set the name of the project."""
329 self._set_property(self._get_key("name"), name)
331 @property
332 def version(self) -> Optional[str]:
333 """Return the version of the project."""
334 return self._get_property(self._get_key("version"))
336 @version.setter
337 def version(self, version: str) -> None:
338 """Set the version of the project."""
339 self._set_property(self._get_key("version"), version)
341 @property
342 def description(self) -> Optional[str]:
343 """Return the description of the project."""
344 return self._get_property(self._get_key("description"))
346 @description.setter
347 def description(self, description: str) -> None:
348 """Set the description of the project."""
349 self._set_property(self._get_key("description"), description)
351 @property
352 def authors(self):
353 """Return the authors of the project."""
354 authors = self._get_property(self._get_key("authors"))
355 if authors is None or len(authors) == 0:
356 return []
358 # only return authors that can be converted to Person
359 authors_validated = [
360 author for author in authors if self._to_person(author) is not None
361 ]
362 return authors_validated
364 @authors.setter
365 def authors(self, authors: List[Union[Person, Entity]]) -> None:
366 """Set the authors of the project."""
367 authors = [self._from_person(c) for c in authors]
368 self._set_property(self._get_key("authors"), authors)
370 @property
371 def maintainers(self):
372 """Return the maintainers of the project."""
373 maintainers = self._get_property(self._get_key("maintainers"))
374 if maintainers is None:
375 return []
377 # only return maintainers that can be converted to Person
378 maintainers_validated = [
379 maintainer
380 for maintainer in maintainers
381 if self._to_person(maintainer) is not None
382 ]
383 return maintainers_validated
385 @maintainers.setter
386 def maintainers(self, maintainers: List[Union[Person, Entity]]) -> None:
387 """Set the maintainers of the project."""
388 maintainers = [self._from_person(c) for c in maintainers]
389 self._set_property(self._get_key("maintainers"), maintainers)
391 @property
392 def contributors(self):
393 """Return the contributors of the project."""
394 return self._get_property(self._get_key("contributors"))
396 @contributors.setter
397 def contributors(self, contributors: List[Union[Person, Entity]]) -> None:
398 """Set the contributors of the project."""
399 contributors = [self._from_person(c) for c in contributors]
400 self._set_property(self._get_key("contributors"), contributors)
402 @property
403 def keywords(self) -> Optional[List[str]]:
404 """Return the keywords of the project."""
405 return self._get_property(self._get_key("keywords"))
407 @keywords.setter
408 def keywords(self, keywords: List[str]) -> None:
409 """Set the keywords of the project."""
410 self._set_property(self._get_key("keywords"), keywords)
412 @property
413 def license(self) -> Optional[str]:
414 """Return the license of the project."""
415 return self._get_property(self._get_key("license"))
417 @license.setter
418 def license(self, license: Optional[str]) -> None:
419 """Set the license of the project."""
420 self._set_property(self._get_key("license"), license)
422 @property
423 def homepage(self) -> Optional[str]:
424 """Return the homepage url of the project."""
425 return self._get_property(self._get_key("homepage"))
427 @homepage.setter
428 def homepage(self, value: Optional[str]) -> None:
429 """Set the homepage url of the project."""
430 self._set_property(self._get_key("homepage"), value)
432 @property
433 def repository(self) -> Optional[Union[str, dict]]:
434 """Return the repository url of the project."""
435 return self._get_property(self._get_key("repository"))
437 @repository.setter
438 def repository(self, value: Optional[Union[str, dict]]) -> None:
439 """Set the repository url of the project."""
440 self._set_property(self._get_key("repository"), value)
442 @property
443 def documentation(self) -> Optional[Union[str, dict]]:
444 """Return the documentation url of the project."""
445 return self._get_property(self._get_key("documentation"))
447 @documentation.setter
448 def documentation(self, value: Optional[Union[str, dict]]) -> None:
449 """Set the documentation url of the project."""
450 self._set_property(self._get_key("documentation"), value)