Coverage for src/somesy/core/writer.py: 95%
181 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-04-30 09:42 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-04-30 09:42 +0000
1"""Project metadata writer base-class."""
2import logging
3from abc import ABC, abstractmethod
4from pathlib import Path
5from typing import Any, Dict, List, Optional, Union
7from somesy.core.models import Person, ProjectMetadata
9logger = logging.getLogger("somesy")
12class IgnoreKey:
13 """Special marker to be passed for dropping a key from serialization."""
16FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]]
17"""Type to be used for the dict passed as `direct_mappings`."""
19DictLike = Any
20"""Dict-like that supports getitem, setitem, delitem, etc.
22NOTE: This should be probably turned into a proper protocol.
23"""
26class ProjectMetadataWriter(ABC):
27 """Base class for Project Metadata Output Wrapper.
29 All supported output formats are implemented as subclasses.
30 """
32 def __init__(
33 self,
34 path: Path,
35 *,
36 create_if_not_exists: Optional[bool] = False,
37 direct_mappings: FieldKeyMapping = None,
38 ) -> None:
39 """Initialize the Project Metadata Output Wrapper.
41 Use the `direct_mappings` dict to define
42 format-specific location for certain fields,
43 if no additional processing is needed that
44 requires a customized setter.
46 Args:
47 path: Path to target output file.
48 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True.
49 direct_mappings: Dict with direct mappings of keys between somesy and target
50 """
51 self._data: DictLike = {}
52 self.path = path if isinstance(path, Path) else Path(path)
53 self.create_if_not_exists = create_if_not_exists
54 self.direct_mappings = direct_mappings or {}
56 if self.path.is_file():
57 self._load()
58 self._validate()
59 else:
60 if self.create_if_not_exists:
61 self._init_new_file()
62 self._load()
63 else:
64 raise FileNotFoundError(f"The file {self.path} does not exist.")
66 def _init_new_file(self) -> None:
67 """Create an new suitable target file.
69 Override to initialize file with minimal contents, if needed.
70 Make sure to set `self._data` to match the contents.
71 """
72 self.path.touch()
74 @abstractmethod
75 def _load(self):
76 """Load the output file and validate it.
78 Implement this method so that it loads the file `self.path`
79 into the `self._data` dict.
81 The file is guaranteed to exist.
82 """
84 @abstractmethod
85 def _validate(self):
86 """Validate the target file data.
88 Implement this method so that it checks
89 the validity of the metadata (relevant to somesy)
90 in that file and raises exceptions on failure.
91 """
93 @abstractmethod
94 def save(self, path: Optional[Path]) -> None:
95 """Save the output file to the given path.
97 Implement this in a way that will carefully
98 update the target file with new metadata
99 without destroying its other contents or structure.
100 """
102 def _get_property(
103 self,
104 key: Union[str, List[str]],
105 *,
106 only_first: bool = False,
107 remove: bool = False,
108 ) -> Optional[Any]:
109 """Get a property from the data.
111 Override this to e.g. rewrite the retrieved key
112 (e.g. if everything relevant is in some subobject).
114 Args:
115 key: Name of the key or sequence of multiple keys to retrieve the value.
116 only_first: If True, returns only first entry if the value is a list.
117 remove: If True, will remove the retrieved value and clean up the dict.
118 """
119 key_path = [key] if isinstance(key, str) else key
121 curr = self._data
122 seq = [curr]
123 for k in key_path:
124 curr = curr.get(k)
125 curr = curr[0] if isinstance(curr, list) and only_first else curr
126 seq.append(curr)
127 if curr is None:
128 return None
130 if remove:
131 seq.pop()
132 del seq[-1][key_path[-1]] # remove leaf value
133 # clean up the tree
134 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))):
135 if not dct.get(key):
136 del dct[key]
138 if isinstance(curr, list) and only_first:
139 return curr[0]
140 return curr
142 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None:
143 """Set a property in the data.
145 Note if there are lists along the path, they are cleared out.
147 Override this to e.g. rewrite the retrieved key
148 (e.g. if everything relevant is in some subobject).
149 """
150 if isinstance(key, IgnoreKey):
151 return
152 key_path = [key] if isinstance(key, str) else key
154 if not value: # remove value and clean up the sub-dict
155 self._get_property(key_path, remove=True)
156 return
158 # create path on the fly if needed
159 curr = self._data
160 for key in key_path[:-1]:
161 if key not in curr:
162 curr[key] = {}
163 curr = curr[key]
165 curr[key_path[-1]] = value
167 # ----
168 # special handling for person metadata
170 def _merge_person_metadata(
171 self, old: List[Person], new: List[Person]
172 ) -> List[Person]:
173 """Update metadata of a list of persons.
175 Will identify people based on orcid, email or full name.
177 If old list has same person listed multiple times,
178 the resulting list will too (we cannot correctly merge for external formats.)
179 """
180 new_people = [] # list for new people (e.g. added authors)
181 # flag, meaning "person was not removed"
182 still_exists = [False for i in range(len(old))]
183 # copies of old person data, to be modified
184 modified_people = [p.model_copy() for p in old]
186 # try to match new people to existing old ones
187 # (inefficient, but author list are not that long usually)
188 for person_meta in new:
189 person_update = person_meta.model_dump()
190 person_existed = False
191 for i in range(len(modified_people)):
192 person = modified_people[i]
193 if not person.same_person(person_meta):
194 continue
196 # not new person (-> will not append new record)
197 person_existed = True
198 # still exists (-> will not be removed from list)
199 still_exists[i] = True
201 # if there were changes -> update person
202 overlapping_fields = person.model_dump(
203 include=set(person_update.keys())
204 )
205 if person_update != overlapping_fields:
206 modified_people[i] = person.model_copy(update=person_update)
208 # show effective update in debug log
209 old_fmt = self._from_person(person)
210 new_fmt = self._from_person(modified_people[i])
211 if old_fmt != new_fmt:
212 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}")
214 if not person_existed:
215 new_people.append(person_meta)
217 # show added and removed people in debug log
218 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]]
219 for person in removed_people:
220 logger.debug(f"Removing person\n{self._from_person(person)}")
221 for person in new_people:
222 logger.debug(f"Adding person\n{self._from_person(person)}")
224 # return updated list of (still existing) people,
225 # and all new people coming after them.
226 existing_modified = [
227 modified_people[i] for i in range(len(old)) if still_exists[i]
228 ]
229 return existing_modified + new_people
231 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]:
232 """Sync a list of persons with new metadata.
234 Args:
235 old (List[Any]): list of persons in format-specific representation
236 new (List[Person]): list of persons in somesy representation
238 Returns:
239 List[Any]: updated list of persons in format-specific representation
240 """
241 old_people: List[Person] = self._parse_people(old)
242 return self._merge_person_metadata(old_people, new)
244 def _sync_authors(self, metadata: ProjectMetadata) -> None:
245 """Sync output file authors with authors from metadata.
247 This method is existing for the publication_author special case
248 when synchronizing to CITATION.cff.
249 """
250 self.authors = self._sync_person_list(self.authors, metadata.authors())
252 def sync(self, metadata: ProjectMetadata) -> None:
253 """Sync output file with other metadata files."""
254 self.name = metadata.name
255 self.description = metadata.description
257 if metadata.version:
258 self.version = metadata.version
260 if metadata.keywords:
261 self.keywords = metadata.keywords
263 self._sync_authors(metadata)
264 self.maintainers = self._sync_person_list(
265 self.maintainers, metadata.maintainers()
266 )
268 self.license = metadata.license.value
270 self.homepage = str(metadata.homepage) if metadata.homepage else None
271 self.repository = str(metadata.repository) if metadata.repository else None
272 self.documentation = (
273 str(metadata.documentation) if metadata.documentation else None
274 )
276 @staticmethod
277 @abstractmethod
278 def _from_person(person: Person) -> Any:
279 """Convert a `Person` object into suitable target format."""
281 @staticmethod
282 @abstractmethod
283 def _to_person(person_obj: Any) -> Person:
284 """Convert an object representing a person into a `Person` object."""
286 @classmethod
287 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]:
288 """Return a list of Persons parsed from list of format-specific people representations."""
289 return list(map(cls._to_person, people or []))
291 # ----
292 # individual magic getters and setters
294 def _get_key(self, key):
295 return self.direct_mappings.get(key) or key
297 @property
298 def name(self):
299 """Return the name of the project."""
300 return self._get_property(self._get_key("name"))
302 @name.setter
303 def name(self, name: str) -> None:
304 """Set the name of the project."""
305 self._set_property(self._get_key("name"), name)
307 @property
308 def version(self) -> Optional[str]:
309 """Return the version of the project."""
310 return self._get_property(self._get_key("version"))
312 @version.setter
313 def version(self, version: str) -> None:
314 """Set the version of the project."""
315 self._set_property(self._get_key("version"), version)
317 @property
318 def description(self) -> Optional[str]:
319 """Return the description of the project."""
320 return self._get_property(self._get_key("description"))
322 @description.setter
323 def description(self, description: str) -> None:
324 """Set the description of the project."""
325 self._set_property(self._get_key("description"), description)
327 @property
328 def authors(self):
329 """Return the authors of the project."""
330 return self._get_property(self._get_key("authors"))
332 @authors.setter
333 def authors(self, authors: List[Person]) -> None:
334 """Set the authors of the project."""
335 authors = [self._from_person(c) for c in authors]
336 self._set_property(self._get_key("authors"), authors)
338 @property
339 def maintainers(self):
340 """Return the maintainers of the project."""
341 return self._get_property(self._get_key("maintainers"))
343 @maintainers.setter
344 def maintainers(self, maintainers: List[Person]) -> None:
345 """Set the maintainers of the project."""
346 maintainers = [self._from_person(c) for c in maintainers]
347 self._set_property(self._get_key("maintainers"), maintainers)
349 @property
350 def contributors(self):
351 """Return the contributors of the project."""
352 return self._get_property(self._get_key("contributors"))
354 @contributors.setter
355 def contributors(self, contributors: List[Person]) -> None:
356 """Set the contributors of the project."""
357 contributors = [self._from_person(c) for c in contributors]
358 self._set_property(self._get_key("contributors"), contributors)
360 @property
361 def keywords(self) -> Optional[List[str]]:
362 """Return the keywords of the project."""
363 return self._get_property(self._get_key("keywords"))
365 @keywords.setter
366 def keywords(self, keywords: List[str]) -> None:
367 """Set the keywords of the project."""
368 self._set_property(self._get_key("keywords"), keywords)
370 @property
371 def license(self) -> Optional[str]:
372 """Return the license of the project."""
373 return self._get_property(self._get_key("license"))
375 @license.setter
376 def license(self, license: Optional[str]) -> None:
377 """Set the license of the project."""
378 self._set_property(self._get_key("license"), license)
380 @property
381 def homepage(self) -> Optional[str]:
382 """Return the homepage url of the project."""
383 return self._get_property(self._get_key("homepage"))
385 @homepage.setter
386 def homepage(self, value: Optional[str]) -> None:
387 """Set the homepage url of the project."""
388 self._set_property(self._get_key("homepage"), value)
390 @property
391 def repository(self) -> Optional[Union[str, dict]]:
392 """Return the repository url of the project."""
393 return self._get_property(self._get_key("repository"))
395 @repository.setter
396 def repository(self, value: Optional[Union[str, dict]]) -> None:
397 """Set the repository url of the project."""
398 self._set_property(self._get_key("repository"), value)
400 @property
401 def documentation(self) -> Optional[Union[str, dict]]:
402 """Return the documentation url of the project."""
403 return self._get_property(self._get_key("documentation"))
405 @documentation.setter
406 def documentation(self, value: Optional[Union[str, dict]]) -> None:
407 """Set the documentation url of the project."""
408 self._set_property(self._get_key("documentation"), value)