Coverage for src/somesy/core/writer.py: 97%
149 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-08-10 14:33 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-08-10 14:33 +0000
1"""Project metadata writer base-class."""
2import logging
3from abc import ABC, abstractmethod
4from pathlib import Path
5from typing import Any, Dict, List, Optional, Union
7from somesy.core.models import Person, ProjectMetadata
9log = logging.getLogger("somesy")
12class ProjectMetadataWriter(ABC):
13 """Base class for Project Metadata Output Wrapper.
15 All supported output formats are implemented as subclasses.
16 """
18 def __init__(
19 self,
20 path: Path,
21 *,
22 create_if_not_exists: Optional[bool] = False,
23 direct_mappings: Dict[str, List[str]] = None,
24 ) -> None:
25 """Initialize the Project Metadata Output Wrapper.
27 Use the `direct_mappings` dict to define
28 format-specific location for certain fields,
29 if no additional processing is needed that
30 requires a customized setter.
32 Args:
33 path: Path to target output file.
34 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True.
35 direct_mappings: Dict with direct mappings of keys between somesy and target
36 """
37 self._data: Dict = {}
38 self.path = path
39 self.create_if_not_exists = create_if_not_exists
40 self.direct_mappings = direct_mappings or {}
42 if self.path.is_file():
43 self._load()
44 self._validate()
45 else:
46 if self.create_if_not_exists:
47 self._init_new_file()
48 else:
49 raise FileNotFoundError(f"The file {self.path} does not exist.")
51 def _init_new_file(self) -> None:
52 """Create an new suitable target file.
54 Override to initialize file with minimal contents, if needed.
55 Make sure to set `self._data` to match the contents.
56 """
57 self.path.touch()
59 @abstractmethod
60 def _load(self):
61 """Load the output file and validate it.
63 Implement this method so that it loads the file `self.path`
64 into the `self._data` dict.
66 The file is guaranteed to exist.
67 """
69 @abstractmethod
70 def _validate(self):
71 """Validate the target file data.
73 Implement this method so that it checks
74 the validity of the metadata (relevant to somesy)
75 in that file and raises exceptions on failure.
76 """
78 @abstractmethod
79 def save(self, path: Optional[Path]) -> None:
80 """Save the output file to the given path.
82 Implement this in a way that will carefully
83 update the target file with new metadata
84 without destroying its other contents or structure.
85 """
87 def _get_property(self, key: Union[str, List[str]]) -> Optional[Any]:
88 """Get a property from the data.
90 Override this to e.g. rewrite the retrieved key
91 (e.g. if everything relevant is in some subobject).
92 """
93 key_path = [key] if isinstance(key, str) else key
95 curr = self._data
96 for k in key_path:
97 curr = curr.get(k)
98 if curr is None:
99 return None
101 return curr
103 def _set_property(self, key: Union[str, List[str]], value: Any) -> None:
104 """Set a property in the data.
106 Override this to e.g. rewrite the retrieved key
107 (e.g. if everything relevant is in some subobject).
108 """
109 if not value:
110 return
111 key_path = [key] if isinstance(key, str) else key
112 # create path on the fly if needed
113 curr = self._data
114 for key in key_path[:-1]:
115 if key not in curr:
116 curr[key] = {}
117 curr = curr[key]
118 curr[key_path[-1]] = value
120 # ----
121 # special handling for person metadata
123 def _merge_person_metadata(
124 self, old: List[Person], new: List[Person]
125 ) -> List[Person]:
126 """Update metadata of a list of persons.
128 Will identify people based on orcid, email or full name.
130 If old list has same person listed multiple times,
131 the resulting list will too (we cannot correctly merge for external formats.)
132 """
133 new_people = [] # list for new people (e.g. added authors)
134 # flag, meaning "person was not removed"
135 still_exists = [False for i in range(len(old))]
136 # copies of old person data, to be modified
137 modified_people = [p.copy() for p in old]
139 for person_meta in new:
140 person_update = person_meta.dict()
141 person_existed = False
142 for i in range(len(modified_people)):
143 person = modified_people[i]
144 if not person.same_person(person_meta):
145 continue
147 # not new person (-> will not append new record)
148 person_existed = True
149 # still exists (-> will not be removed from list)
150 still_exists[i] = True
152 # if there were changes -> update person
153 overlapping_fields = person.dict(include=set(person_update.keys()))
154 if person_update != overlapping_fields:
155 modified_people[i] = person.copy(update=person_update)
157 # show effective update in debug log
158 old_fmt = self._from_person(person)
159 new_fmt = self._from_person(modified_people[i])
160 if old_fmt != new_fmt:
161 log.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}")
163 if not person_existed:
164 new_people.append(person_meta)
166 # show added and removed people in debug log
167 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]]
168 for person in removed_people:
169 pers_fmt = self._from_person(person)
170 log.debug(f"Removing person\n{pers_fmt}")
171 for person in new_people:
172 pers_fmt = self._from_person(person)
173 log.debug(f"Adding person\n{pers_fmt}")
175 # return updated list of (still existing) people,
176 # and all new people coming after them.
177 existing_modified = [
178 modified_people[i] for i in range(len(old)) if still_exists[i]
179 ]
180 return existing_modified + new_people
182 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]:
183 old_people: List[Person] = self._parse_people(old)
184 return self._merge_person_metadata(old_people, new)
186 def sync(self, metadata: ProjectMetadata) -> None:
187 """Sync output file with other metadata files."""
188 self.name = metadata.name
189 self.description = metadata.description
191 if metadata.version:
192 self.version = metadata.version
194 if metadata.keywords:
195 self.keywords = metadata.keywords
197 self.authors = self._sync_person_list(self.authors, metadata.authors())
198 self.maintainers = self._sync_person_list(
199 self.maintainers, metadata.maintainers()
200 )
202 self.license = metadata.license.value
203 if metadata.homepage:
204 self.homepage = str(metadata.homepage)
205 if metadata.repository:
206 self.repository = str(metadata.repository)
208 @staticmethod
209 @abstractmethod
210 def _from_person(person: Person) -> Any:
211 """Convert a `Person` object into suitable target format."""
213 @staticmethod
214 @abstractmethod
215 def _to_person(person_obj: Any) -> Person:
216 """Convert an object representing a person into a `Person` object."""
218 @classmethod
219 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]:
220 """Return a list of Persons parsed from list format-specific people representations."""
221 return list(map(cls._to_person, people or []))
223 # ----
224 # individual magic getters and setters
226 def _get_key(self, key):
227 return self.direct_mappings.get(key) or key
229 @property
230 def name(self):
231 """Return the name of the project."""
232 return self._get_property(self._get_key("name"))
234 @name.setter
235 def name(self, name: str) -> None:
236 """Set the name of the project."""
237 self._set_property(self._get_key("name"), name)
239 @property
240 def version(self) -> Optional[str]:
241 """Return the version of the project."""
242 return self._get_property(self._get_key("version"))
244 @version.setter
245 def version(self, version: str) -> None:
246 """Set the version of the project."""
247 self._set_property(self._get_key("version"), version)
249 @property
250 def description(self) -> Optional[str]:
251 """Return the description of the project."""
252 return self._get_property(self._get_key("description"))
254 @description.setter
255 def description(self, description: str) -> None:
256 """Set the description of the project."""
257 self._set_property(self._get_key("description"), description)
259 @property
260 def authors(self):
261 """Return the authors of the project."""
262 return self._get_property(self._get_key("authors"))
264 @authors.setter
265 def authors(self, authors: List[Person]) -> None:
266 """Set the authors of the project."""
267 authors = [self._from_person(c) for c in authors]
268 self._set_property(self._get_key("authors"), authors)
270 @property
271 def maintainers(self):
272 """Return the maintainers of the project."""
273 return self._get_property(self._get_key("maintainers"))
275 @maintainers.setter
276 def maintainers(self, maintainers: List[Person]) -> None:
277 """Set the maintainers of the project."""
278 maintainers = [self._from_person(c) for c in maintainers]
279 self._set_property(self._get_key("maintainers"), maintainers)
281 @property
282 def keywords(self) -> Optional[List[str]]:
283 """Return the keywords of the project."""
284 return self._get_property(self._get_key("keywords"))
286 @keywords.setter
287 def keywords(self, keywords: List[str]) -> None:
288 """Set the keywords of the project."""
289 self._set_property(self._get_key("keywords"), keywords)
291 @property
292 def license(self) -> Optional[str]:
293 """Return the license of the project."""
294 return self._get_property(self._get_key("license"))
296 @license.setter
297 def license(self, license: Optional[str]) -> None:
298 """Set the license of the project."""
299 self._set_property(self._get_key("license"), license)
301 @property
302 def homepage(self) -> Optional[str]:
303 """Return the homepage url of the project."""
304 return self._get_property(self._get_key("homepage"))
306 @homepage.setter
307 def homepage(self, homepage: Optional[str]) -> None:
308 """Set the homepage url of the project."""
309 self._set_property(self._get_key("homepage"), homepage)
311 @property
312 def repository(self) -> Optional[Union[str, dict]]:
313 """Return the repository url of the project."""
314 return self._get_property(self._get_key("repository"))
316 @repository.setter
317 def repository(self, repository: Optional[Union[str, dict]]) -> None:
318 """Set the repository url of the project."""
319 self._set_property(self._get_key("repository"), repository)