Coverage for src/somesy/core/writer.py: 95%
187 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-29 07:50 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-29 07:50 +0000
1"""Project metadata writer base-class."""
3import logging
4from abc import ABC, abstractmethod
5from pathlib import Path
6from typing import Any, Dict, List, Optional, Union
8from somesy.core.models import Person, ProjectMetadata
10logger = logging.getLogger("somesy")
13class IgnoreKey:
14 """Special marker to be passed for dropping a key from serialization."""
17FieldKeyMapping = Dict[str, Union[List[str], IgnoreKey]]
18"""Type to be used for the dict passed as `direct_mappings`."""
20DictLike = Any
21"""Dict-like that supports getitem, setitem, delitem, etc.
23NOTE: This should be probably turned into a proper protocol.
24"""
27class ProjectMetadataWriter(ABC):
28 """Base class for Project Metadata Output Wrapper.
30 All supported output formats are implemented as subclasses.
31 """
33 def __init__(
34 self,
35 path: Path,
36 *,
37 create_if_not_exists: Optional[bool] = False,
38 direct_mappings: FieldKeyMapping = None,
39 ) -> None:
40 """Initialize the Project Metadata Output Wrapper.
42 Use the `direct_mappings` dict to define
43 format-specific location for certain fields,
44 if no additional processing is needed that
45 requires a customized setter.
47 Args:
48 path: Path to target output file.
49 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True.
50 direct_mappings: Dict with direct mappings of keys between somesy and target
52 """
53 self._data: DictLike = {}
54 self.path = path if isinstance(path, Path) else Path(path)
55 self.create_if_not_exists = create_if_not_exists
56 self.direct_mappings = direct_mappings or {}
58 if self.path.is_file():
59 self._load()
60 self._validate()
61 else:
62 if self.create_if_not_exists:
63 self._init_new_file()
64 self._load()
65 else:
66 raise FileNotFoundError(f"The file {self.path} does not exist.")
68 def _init_new_file(self) -> None:
69 """Create an new suitable target file.
71 Override to initialize file with minimal contents, if needed.
72 Make sure to set `self._data` to match the contents.
73 """
74 self.path.touch()
76 @abstractmethod
77 def _load(self):
78 """Load the output file and validate it.
80 Implement this method so that it loads the file `self.path`
81 into the `self._data` dict.
83 The file is guaranteed to exist.
84 """
86 @abstractmethod
87 def _validate(self):
88 """Validate the target file data.
90 Implement this method so that it checks
91 the validity of the metadata (relevant to somesy)
92 in that file and raises exceptions on failure.
93 """
95 @abstractmethod
96 def save(self, path: Optional[Path]) -> None:
97 """Save the output file to the given path.
99 Implement this in a way that will carefully
100 update the target file with new metadata
101 without destroying its other contents or structure.
102 """
104 def _get_property(
105 self,
106 key: Union[str, List[str]],
107 *,
108 only_first: bool = False,
109 remove: bool = False,
110 ) -> Optional[Any]:
111 """Get a property from the data.
113 Override this to e.g. rewrite the retrieved key
114 (e.g. if everything relevant is in some subobject).
116 Args:
117 key: Name of the key or sequence of multiple keys to retrieve the value.
118 only_first: If True, returns only first entry if the value is a list.
119 remove: If True, will remove the retrieved value and clean up the dict.
121 """
122 key_path = [key] if isinstance(key, str) else key
124 curr = self._data
125 seq = [curr]
126 for k in key_path:
127 curr = curr.get(k)
128 curr = curr[0] if isinstance(curr, list) and only_first else curr
129 seq.append(curr)
130 if curr is None:
131 return None
133 if remove:
134 seq.pop()
135 del seq[-1][key_path[-1]] # remove leaf value
136 # clean up the tree
137 for key, dct in reversed(list(zip(key_path[:-1], seq[:-1]))):
138 if not dct.get(key):
139 del dct[key]
141 if isinstance(curr, list) and only_first:
142 return curr[0]
143 return curr
145 def _set_property(self, key: Union[str, List[str], IgnoreKey], value: Any) -> None:
146 """Set a property in the data.
148 Note if there are lists along the path, they are cleared out.
150 Override this to e.g. rewrite the retrieved key
151 (e.g. if everything relevant is in some subobject).
152 """
153 if isinstance(key, IgnoreKey):
154 return
155 key_path = [key] if isinstance(key, str) else key
157 if not value: # remove value and clean up the sub-dict
158 self._get_property(key_path, remove=True)
159 return
161 # create path on the fly if needed
162 curr = self._data
163 for key in key_path[:-1]:
164 if key not in curr:
165 curr[key] = {}
166 curr = curr[key]
168 curr[key_path[-1]] = value
170 # ----
171 # special handling for person metadata
173 def _merge_person_metadata(
174 self, old: List[Person], new: List[Person]
175 ) -> List[Person]:
176 """Update metadata of a list of persons.
178 Will identify people based on orcid, email or full name.
180 If old list has same person listed multiple times,
181 the resulting list will too (we cannot correctly merge for external formats.)
182 """
183 new_people = [] # list for new people (e.g. added authors)
184 # flag, meaning "person was not removed"
185 still_exists = [False for i in range(len(old))]
186 # copies of old person data, to be modified
187 modified_people = [p.model_copy() for p in old]
189 # try to match new people to existing old ones
190 # (inefficient, but author list are not that long usually)
191 for person_meta in new:
192 person_update = person_meta.model_dump()
193 person_existed = False
194 for i in range(len(modified_people)):
195 person = modified_people[i]
196 if not person.same_person(person_meta):
197 continue
199 # not new person (-> will not append new record)
200 person_existed = True
201 # still exists (-> will not be removed from list)
202 still_exists[i] = True
204 # if there were changes -> update person
205 overlapping_fields = person.model_dump(
206 include=set(person_update.keys())
207 )
208 if person_update != overlapping_fields:
209 modified_people[i] = person.model_copy(update=person_update)
211 # show effective update in debug log
212 old_fmt = self._from_person(person)
213 new_fmt = self._from_person(modified_people[i])
214 if old_fmt != new_fmt:
215 logger.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}")
217 if not person_existed:
218 new_people.append(person_meta)
220 # show added and removed people in debug log
221 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]]
222 for person in removed_people:
223 logger.debug(f"Removing person\n{self._from_person(person)}")
224 for person in new_people:
225 logger.debug(f"Adding person\n{self._from_person(person)}")
227 # return updated list of (still existing) people,
228 # and all new people coming after them.
229 existing_modified = [
230 modified_people[i] for i in range(len(old)) if still_exists[i]
231 ]
232 return existing_modified + new_people
234 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]:
235 """Sync a list of persons with new metadata.
237 Args:
238 old (List[Any]): list of persons in format-specific representation
239 new (List[Person]): list of persons in somesy representation
241 Returns:
242 List[Any]: updated list of persons in format-specific representation
244 """
245 old_people: List[Person] = self._parse_people(old)
246 return self._merge_person_metadata(old_people, new)
248 def _sync_authors(self, metadata: ProjectMetadata) -> None:
249 """Sync output file authors with authors from metadata.
251 This method is existing for the publication_author special case
252 when synchronizing to CITATION.cff.
253 """
254 self.authors = self._sync_person_list(self.authors, metadata.authors())
256 def sync(self, metadata: ProjectMetadata) -> None:
257 """Sync output file with other metadata files."""
258 self.name = metadata.name
259 self.description = metadata.description
261 if metadata.version:
262 self.version = metadata.version
264 if metadata.keywords:
265 self.keywords = metadata.keywords
267 self._sync_authors(metadata)
268 self.maintainers = self._sync_person_list(
269 self.maintainers, metadata.maintainers()
270 )
272 self.license = metadata.license.value
274 self.homepage = str(metadata.homepage) if metadata.homepage else None
275 self.repository = str(metadata.repository) if metadata.repository else None
276 self.documentation = (
277 str(metadata.documentation) if metadata.documentation else None
278 )
280 @staticmethod
281 @abstractmethod
282 def _from_person(person: Person) -> Any:
283 """Convert a `Person` object into suitable target format."""
285 @staticmethod
286 @abstractmethod
287 def _to_person(person_obj: Any) -> Person:
288 """Convert an object representing a person into a `Person` object."""
290 @classmethod
291 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]:
292 """Return a list of Persons parsed from list of format-specific people representations."""
293 return list(map(cls._to_person, people or []))
295 # ----
296 # individual magic getters and setters
298 def _get_key(self, key):
299 return self.direct_mappings.get(key) or key
301 @property
302 def name(self):
303 """Return the name of the project."""
304 return self._get_property(self._get_key("name"))
306 @name.setter
307 def name(self, name: str) -> None:
308 """Set the name of the project."""
309 self._set_property(self._get_key("name"), name)
311 @property
312 def version(self) -> Optional[str]:
313 """Return the version of the project."""
314 return self._get_property(self._get_key("version"))
316 @version.setter
317 def version(self, version: str) -> None:
318 """Set the version of the project."""
319 self._set_property(self._get_key("version"), version)
321 @property
322 def description(self) -> Optional[str]:
323 """Return the description of the project."""
324 return self._get_property(self._get_key("description"))
326 @description.setter
327 def description(self, description: str) -> None:
328 """Set the description of the project."""
329 self._set_property(self._get_key("description"), description)
331 @property
332 def authors(self):
333 """Return the authors of the project."""
334 authors = self._get_property(self._get_key("authors"))
335 if authors is None:
336 return []
338 # only return authors that can be converted to Person
339 authors_validated = [
340 author for author in authors if self._to_person(author) is not None
341 ]
342 return authors_validated
344 @authors.setter
345 def authors(self, authors: List[Person]) -> None:
346 """Set the authors of the project."""
347 authors = [self._from_person(c) for c in authors]
348 self._set_property(self._get_key("authors"), authors)
350 @property
351 def maintainers(self):
352 """Return the maintainers of the project."""
353 maintainers = self._get_property(self._get_key("maintainers"))
354 if maintainers is None:
355 return []
357 # only return maintainers that can be converted to Person
358 maintainers_validated = [
359 maintainer
360 for maintainer in maintainers
361 if self._to_person(maintainer) is not None
362 ]
363 return maintainers_validated
365 @maintainers.setter
366 def maintainers(self, maintainers: List[Person]) -> None:
367 """Set the maintainers of the project."""
368 maintainers = [self._from_person(c) for c in maintainers]
369 self._set_property(self._get_key("maintainers"), maintainers)
371 @property
372 def contributors(self):
373 """Return the contributors of the project."""
374 return self._get_property(self._get_key("contributors"))
376 @contributors.setter
377 def contributors(self, contributors: List[Person]) -> None:
378 """Set the contributors of the project."""
379 contributors = [self._from_person(c) for c in contributors]
380 self._set_property(self._get_key("contributors"), contributors)
382 @property
383 def keywords(self) -> Optional[List[str]]:
384 """Return the keywords of the project."""
385 return self._get_property(self._get_key("keywords"))
387 @keywords.setter
388 def keywords(self, keywords: List[str]) -> None:
389 """Set the keywords of the project."""
390 self._set_property(self._get_key("keywords"), keywords)
392 @property
393 def license(self) -> Optional[str]:
394 """Return the license of the project."""
395 return self._get_property(self._get_key("license"))
397 @license.setter
398 def license(self, license: Optional[str]) -> None:
399 """Set the license of the project."""
400 self._set_property(self._get_key("license"), license)
402 @property
403 def homepage(self) -> Optional[str]:
404 """Return the homepage url of the project."""
405 return self._get_property(self._get_key("homepage"))
407 @homepage.setter
408 def homepage(self, value: Optional[str]) -> None:
409 """Set the homepage url of the project."""
410 self._set_property(self._get_key("homepage"), value)
412 @property
413 def repository(self) -> Optional[Union[str, dict]]:
414 """Return the repository url of the project."""
415 return self._get_property(self._get_key("repository"))
417 @repository.setter
418 def repository(self, value: Optional[Union[str, dict]]) -> None:
419 """Set the repository url of the project."""
420 self._set_property(self._get_key("repository"), value)
422 @property
423 def documentation(self) -> Optional[Union[str, dict]]:
424 """Return the documentation url of the project."""
425 return self._get_property(self._get_key("documentation"))
427 @documentation.setter
428 def documentation(self, value: Optional[Union[str, dict]]) -> None:
429 """Set the documentation url of the project."""
430 self._set_property(self._get_key("documentation"), value)