Coverage for src/somesy/core/writer.py: 97%

149 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-08-10 14:33 +0000

1"""Project metadata writer base-class.""" 

2import logging 

3from abc import ABC, abstractmethod 

4from pathlib import Path 

5from typing import Any, Dict, List, Optional, Union 

6 

7from somesy.core.models import Person, ProjectMetadata 

8 

9log = logging.getLogger("somesy") 

10 

11 

12class ProjectMetadataWriter(ABC): 

13 """Base class for Project Metadata Output Wrapper. 

14 

15 All supported output formats are implemented as subclasses. 

16 """ 

17 

18 def __init__( 

19 self, 

20 path: Path, 

21 *, 

22 create_if_not_exists: Optional[bool] = False, 

23 direct_mappings: Dict[str, List[str]] = None, 

24 ) -> None: 

25 """Initialize the Project Metadata Output Wrapper. 

26 

27 Use the `direct_mappings` dict to define 

28 format-specific location for certain fields, 

29 if no additional processing is needed that 

30 requires a customized setter. 

31 

32 Args: 

33 path: Path to target output file. 

34 create_if_not_exists: Create an empty CFF file if not exists. Defaults to True. 

35 direct_mappings: Dict with direct mappings of keys between somesy and target 

36 """ 

37 self._data: Dict = {} 

38 self.path = path 

39 self.create_if_not_exists = create_if_not_exists 

40 self.direct_mappings = direct_mappings or {} 

41 

42 if self.path.is_file(): 

43 self._load() 

44 self._validate() 

45 else: 

46 if self.create_if_not_exists: 

47 self._init_new_file() 

48 else: 

49 raise FileNotFoundError(f"The file {self.path} does not exist.") 

50 

51 def _init_new_file(self) -> None: 

52 """Create an new suitable target file. 

53 

54 Override to initialize file with minimal contents, if needed. 

55 Make sure to set `self._data` to match the contents. 

56 """ 

57 self.path.touch() 

58 

59 @abstractmethod 

60 def _load(self): 

61 """Load the output file and validate it. 

62 

63 Implement this method so that it loads the file `self.path` 

64 into the `self._data` dict. 

65 

66 The file is guaranteed to exist. 

67 """ 

68 

69 @abstractmethod 

70 def _validate(self): 

71 """Validate the target file data. 

72 

73 Implement this method so that it checks 

74 the validity of the metadata (relevant to somesy) 

75 in that file and raises exceptions on failure. 

76 """ 

77 

78 @abstractmethod 

79 def save(self, path: Optional[Path]) -> None: 

80 """Save the output file to the given path. 

81 

82 Implement this in a way that will carefully 

83 update the target file with new metadata 

84 without destroying its other contents or structure. 

85 """ 

86 

87 def _get_property(self, key: Union[str, List[str]]) -> Optional[Any]: 

88 """Get a property from the data. 

89 

90 Override this to e.g. rewrite the retrieved key 

91 (e.g. if everything relevant is in some subobject). 

92 """ 

93 key_path = [key] if isinstance(key, str) else key 

94 

95 curr = self._data 

96 for k in key_path: 

97 curr = curr.get(k) 

98 if curr is None: 

99 return None 

100 

101 return curr 

102 

103 def _set_property(self, key: Union[str, List[str]], value: Any) -> None: 

104 """Set a property in the data. 

105 

106 Override this to e.g. rewrite the retrieved key 

107 (e.g. if everything relevant is in some subobject). 

108 """ 

109 if not value: 

110 return 

111 key_path = [key] if isinstance(key, str) else key 

112 # create path on the fly if needed 

113 curr = self._data 

114 for key in key_path[:-1]: 

115 if key not in curr: 

116 curr[key] = {} 

117 curr = curr[key] 

118 curr[key_path[-1]] = value 

119 

120 # ---- 

121 # special handling for person metadata 

122 

123 def _merge_person_metadata( 

124 self, old: List[Person], new: List[Person] 

125 ) -> List[Person]: 

126 """Update metadata of a list of persons. 

127 

128 Will identify people based on orcid, email or full name. 

129 

130 If old list has same person listed multiple times, 

131 the resulting list will too (we cannot correctly merge for external formats.) 

132 """ 

133 new_people = [] # list for new people (e.g. added authors) 

134 # flag, meaning "person was not removed" 

135 still_exists = [False for i in range(len(old))] 

136 # copies of old person data, to be modified 

137 modified_people = [p.copy() for p in old] 

138 

139 for person_meta in new: 

140 person_update = person_meta.dict() 

141 person_existed = False 

142 for i in range(len(modified_people)): 

143 person = modified_people[i] 

144 if not person.same_person(person_meta): 

145 continue 

146 

147 # not new person (-> will not append new record) 

148 person_existed = True 

149 # still exists (-> will not be removed from list) 

150 still_exists[i] = True 

151 

152 # if there were changes -> update person 

153 overlapping_fields = person.dict(include=set(person_update.keys())) 

154 if person_update != overlapping_fields: 

155 modified_people[i] = person.copy(update=person_update) 

156 

157 # show effective update in debug log 

158 old_fmt = self._from_person(person) 

159 new_fmt = self._from_person(modified_people[i]) 

160 if old_fmt != new_fmt: 

161 log.debug(f"Updating person\n{old_fmt}\nto\n{new_fmt}") 

162 

163 if not person_existed: 

164 new_people.append(person_meta) 

165 

166 # show added and removed people in debug log 

167 removed_people = [old[i] for i in range(len(old)) if not still_exists[i]] 

168 for person in removed_people: 

169 pers_fmt = self._from_person(person) 

170 log.debug(f"Removing person\n{pers_fmt}") 

171 for person in new_people: 

172 pers_fmt = self._from_person(person) 

173 log.debug(f"Adding person\n{pers_fmt}") 

174 

175 # return updated list of (still existing) people, 

176 # and all new people coming after them. 

177 existing_modified = [ 

178 modified_people[i] for i in range(len(old)) if still_exists[i] 

179 ] 

180 return existing_modified + new_people 

181 

182 def _sync_person_list(self, old: List[Any], new: List[Person]) -> List[Any]: 

183 old_people: List[Person] = self._parse_people(old) 

184 return self._merge_person_metadata(old_people, new) 

185 

186 def sync(self, metadata: ProjectMetadata) -> None: 

187 """Sync output file with other metadata files.""" 

188 self.name = metadata.name 

189 self.description = metadata.description 

190 

191 if metadata.version: 

192 self.version = metadata.version 

193 

194 if metadata.keywords: 

195 self.keywords = metadata.keywords 

196 

197 self.authors = self._sync_person_list(self.authors, metadata.authors()) 

198 self.maintainers = self._sync_person_list( 

199 self.maintainers, metadata.maintainers() 

200 ) 

201 

202 self.license = metadata.license.value 

203 if metadata.homepage: 

204 self.homepage = str(metadata.homepage) 

205 if metadata.repository: 

206 self.repository = str(metadata.repository) 

207 

208 @staticmethod 

209 @abstractmethod 

210 def _from_person(person: Person) -> Any: 

211 """Convert a `Person` object into suitable target format.""" 

212 

213 @staticmethod 

214 @abstractmethod 

215 def _to_person(person_obj: Any) -> Person: 

216 """Convert an object representing a person into a `Person` object.""" 

217 

218 @classmethod 

219 def _parse_people(cls, people: Optional[List[Any]]) -> List[Person]: 

220 """Return a list of Persons parsed from list format-specific people representations.""" 

221 return list(map(cls._to_person, people or [])) 

222 

223 # ---- 

224 # individual magic getters and setters 

225 

226 def _get_key(self, key): 

227 return self.direct_mappings.get(key) or key 

228 

229 @property 

230 def name(self): 

231 """Return the name of the project.""" 

232 return self._get_property(self._get_key("name")) 

233 

234 @name.setter 

235 def name(self, name: str) -> None: 

236 """Set the name of the project.""" 

237 self._set_property(self._get_key("name"), name) 

238 

239 @property 

240 def version(self) -> Optional[str]: 

241 """Return the version of the project.""" 

242 return self._get_property(self._get_key("version")) 

243 

244 @version.setter 

245 def version(self, version: str) -> None: 

246 """Set the version of the project.""" 

247 self._set_property(self._get_key("version"), version) 

248 

249 @property 

250 def description(self) -> Optional[str]: 

251 """Return the description of the project.""" 

252 return self._get_property(self._get_key("description")) 

253 

254 @description.setter 

255 def description(self, description: str) -> None: 

256 """Set the description of the project.""" 

257 self._set_property(self._get_key("description"), description) 

258 

259 @property 

260 def authors(self): 

261 """Return the authors of the project.""" 

262 return self._get_property(self._get_key("authors")) 

263 

264 @authors.setter 

265 def authors(self, authors: List[Person]) -> None: 

266 """Set the authors of the project.""" 

267 authors = [self._from_person(c) for c in authors] 

268 self._set_property(self._get_key("authors"), authors) 

269 

270 @property 

271 def maintainers(self): 

272 """Return the maintainers of the project.""" 

273 return self._get_property(self._get_key("maintainers")) 

274 

275 @maintainers.setter 

276 def maintainers(self, maintainers: List[Person]) -> None: 

277 """Set the maintainers of the project.""" 

278 maintainers = [self._from_person(c) for c in maintainers] 

279 self._set_property(self._get_key("maintainers"), maintainers) 

280 

281 @property 

282 def keywords(self) -> Optional[List[str]]: 

283 """Return the keywords of the project.""" 

284 return self._get_property(self._get_key("keywords")) 

285 

286 @keywords.setter 

287 def keywords(self, keywords: List[str]) -> None: 

288 """Set the keywords of the project.""" 

289 self._set_property(self._get_key("keywords"), keywords) 

290 

291 @property 

292 def license(self) -> Optional[str]: 

293 """Return the license of the project.""" 

294 return self._get_property(self._get_key("license")) 

295 

296 @license.setter 

297 def license(self, license: Optional[str]) -> None: 

298 """Set the license of the project.""" 

299 self._set_property(self._get_key("license"), license) 

300 

301 @property 

302 def homepage(self) -> Optional[str]: 

303 """Return the homepage url of the project.""" 

304 return self._get_property(self._get_key("homepage")) 

305 

306 @homepage.setter 

307 def homepage(self, homepage: Optional[str]) -> None: 

308 """Set the homepage url of the project.""" 

309 self._set_property(self._get_key("homepage"), homepage) 

310 

311 @property 

312 def repository(self) -> Optional[Union[str, dict]]: 

313 """Return the repository url of the project.""" 

314 return self._get_property(self._get_key("repository")) 

315 

316 @repository.setter 

317 def repository(self, repository: Optional[Union[str, dict]]) -> None: 

318 """Set the repository url of the project.""" 

319 self._set_property(self._get_key("repository"), repository)