Coverage for src/somesy/core/models.py: 97%

182 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-08-10 14:33 +0000

1"""Core models for the somesy package.""" 

2from __future__ import annotations 

3 

4import functools 

5import json 

6from datetime import date 

7from pathlib import Path 

8from typing import Any, Dict, List, Optional 

9 

10from pydantic import ( 

11 AnyUrl, 

12 BaseModel, 

13 Extra, 

14 Field, 

15 PrivateAttr, 

16 root_validator, 

17 validator, 

18) 

19from rich.pretty import pretty_repr 

20from typing_extensions import Annotated 

21 

22from .core import get_input_content 

23from .log import SomesyLogLevel 

24from .types import ContributionTypeEnum, Country, LicenseEnum 

25 

26# -------- 

27# Somesy configuration model 

28 

29 

30SOMESY_TARGETS = ["cff", "pyproject", "codemeta"] 

31 

32 

33class SomesyBaseModel(BaseModel): 

34 """Customized pydantic BaseModel for somesy. 

35 

36 Apart from some general tweaks for better defaults, 

37 adds a private `_key_order` field, which is used to track the 

38 preferred order for serialization (usually coming from some existing input). 

39 

40 It can be set on an instance using the set_key_order method, 

41 and is preserved by `copy()`. 

42 

43 NOTE: The custom order is intended for leaf models (no further nested models), 

44 custom order will not work correctly across nesting layers. 

45 """ 

46 

47 class Config: 

48 """Pydantic config.""" 

49 

50 extra = Extra.forbid 

51 allow_population_by_field_name = True 

52 underscore_attrs_are_private = True 

53 anystr_strip_whitespace = True 

54 min_anystr_length = 1 

55 

56 # ---- 

57 # Key order magic 

58 

59 _key_order: List[str] = PrivateAttr([]) 

60 """List of field names (NOT aliases!) in the order they should be written in.""" 

61 

62 @classmethod 

63 @functools.lru_cache() # compute once per class 

64 def _aliases(cls) -> Dict[str, str]: 

65 """Map back from alias field names to internal field names.""" 

66 return {v.alias: k for k, v in cls.__fields__.items()} 

67 

68 @classmethod 

69 def make_partial(cls, dct): 

70 """Construct unvalidated partial model from dict. 

71 

72 Handles aliases correctly, unlike `construct`. 

73 """ 

74 un_alias = cls._aliases() 

75 return cls.construct(**{un_alias.get(k) or k: v for k, v in dct.items()}) 

76 

77 def set_key_order(self, keys: List[str]): 

78 """Setter for custom key order used in serialization.""" 

79 un_alias = self._aliases() 

80 # make sure we use the _actual_ field names 

81 self._key_order = list(map(lambda k: un_alias.get(k) or k, keys)) 

82 

83 def copy(self, *args, **kwargs): 

84 """Patched copy method (to preserve custom key order).""" 

85 ret = super().copy(*args, **kwargs) 

86 ret.set_key_order(list(self._key_order)) 

87 return ret 

88 

89 @staticmethod 

90 def _patch_kwargs_defaults(kwargs): 

91 for key in ["exclude_defaults", "exclude_none", "exclude_unset"]: 

92 if not kwargs.get(key): 

93 kwargs[key] = True 

94 

95 def _reorder_dict(self, dct): 

96 """Return dict with patched key order (according to `self._key_order`). 

97 

98 Keys in `dct` not listed in `self._key_order` come after all others. 

99 

100 Used to patch up `dict()` and `json()`. 

101 """ 

102 key_order = self._key_order or [] 

103 existing = set(key_order).intersection(set(dct.keys())) 

104 key_order = [k for k in key_order if k in existing] 

105 key_order += list(set(dct.keys()) - set(key_order)) 

106 return {k: dct[k] for k in key_order} 

107 

108 def dict(self, *args, **kwargs): 

109 """Patched dict method (to preserve custom key order).""" 

110 self._patch_kwargs_defaults(kwargs) 

111 by_alias = kwargs.pop("by_alias", False) 

112 

113 dct = super().dict(*args, **kwargs, by_alias=False) 

114 ret = self._reorder_dict(dct) 

115 

116 if by_alias: 

117 ret = {self.__fields__[k].alias: v for k, v in ret.items()} 

118 return ret 

119 

120 def json(self, *args, **kwargs): 

121 """Patched json method (to preserve custom key order).""" 

122 self._patch_kwargs_defaults(kwargs) 

123 by_alias = kwargs.pop("by_alias", False) 

124 

125 # loop back json through dict to apply custom key order 

126 dct = json.loads(super().json(*args, **kwargs, by_alias=False)) 

127 ret = self._reorder_dict(dct) 

128 

129 if by_alias: 

130 ret = {self.__fields__[k].alias: v for k, v in ret.items()} 

131 return json.dumps(ret) 

132 

133 

134class SomesyConfig(SomesyBaseModel): 

135 """Pydantic model for somesy tool configuration. 

136 

137 Note that all fields match CLI options, and CLI options will override the 

138 values declared in a somesy input file (such as `somesy.toml`). 

139 """ 

140 

141 @root_validator 

142 def at_least_one_target(cls, values): 

143 """Check that at least one output file is enabled.""" 

144 if all(map(lambda x: values.get(f"no_sync_{x}"), SOMESY_TARGETS)): 

145 msg = "No sync target enabled, nothing to do. Probably this is a mistake?" 

146 raise ValueError(msg) 

147 

148 return values 

149 

150 # cli flags 

151 show_info: Annotated[ 

152 bool, 

153 Field( 

154 description="Show basic information messages on run (-v flag).", 

155 ), 

156 ] = False 

157 verbose: Annotated[ 

158 bool, Field(description="Show verbose messages on run (-vv flag).") 

159 ] = False 

160 debug: Annotated[ 

161 bool, Field(description="Show debug messages on run (-vvv flag).") 

162 ] = False 

163 

164 input_file: Annotated[ 

165 Path, Field(description="Project metadata input file path.") 

166 ] = Path("somesy.toml") 

167 

168 no_sync_pyproject: Annotated[ 

169 bool, Field(description="Do not sync with pyproject.toml.") 

170 ] = False 

171 pyproject_file: Annotated[ 

172 Path, Field(description="pyproject.toml file path.") 

173 ] = Path("pyproject.toml") 

174 

175 no_sync_package_json: Annotated[ 

176 bool, Field(description="Do not sync with package.json.") 

177 ] = False 

178 package_json_file: Annotated[ 

179 Path, Field(description="package.json file path.") 

180 ] = Path("package.json") 

181 

182 no_sync_cff: Annotated[bool, Field(description="Do not sync with CFF.")] = False 

183 cff_file: Annotated[Path, Field(description="CFF file path.")] = Path( 

184 "CITATION.cff" 

185 ) 

186 

187 no_sync_codemeta: Annotated[ 

188 bool, Field(description="Do not sync with codemeta.json.") 

189 ] = False 

190 codemeta_file: Annotated[ 

191 Path, Field(description="codemeta.json file path.") 

192 ] = Path("codemeta.json") 

193 

194 def log_level(self) -> SomesyLogLevel: 

195 """Return log level derived from this configuration.""" 

196 return SomesyLogLevel.from_flags( 

197 info=self.show_info, verbose=self.verbose, debug=self.debug 

198 ) 

199 

200 def update_log_level(self, log_level: SomesyLogLevel): 

201 """Update config flags according to passed log level.""" 

202 self.show_info = log_level == SomesyLogLevel.INFO 

203 self.verbose = log_level == SomesyLogLevel.VERBOSE 

204 self.debug = log_level == SomesyLogLevel.DEBUG 

205 

206 def get_input(self) -> SomesyInput: 

207 """Based on the somesy config, load the complete somesy input.""" 

208 # get metadata+config from specified input file 

209 somesy_input = SomesyInput.from_input_file(self.input_file) 

210 # update input with merged config settings (cli overrides config file) 

211 dct: Dict[str, Any] = {} 

212 dct.update(somesy_input.config or {}) 

213 dct.update(self.dict()) 

214 somesy_input.config = SomesyConfig(**dct) 

215 return somesy_input 

216 

217 

218# -------- 

219# Project metadata model (modified from CITATION.cff) 

220 

221 

222class Person(SomesyBaseModel): 

223 """Metadata abount a person in the context of a software project. 

224 

225 This schema is based on CITATION.cff 1.2, modified and extended for the needs of somesy. 

226 """ 

227 

228 # NOTE: we rely on the defined aliases for direct CITATION.cff interoperability. 

229 

230 orcid: Annotated[ 

231 Optional[AnyUrl], 

232 Field( 

233 description="The person's ORCID url **(not required, but highly suggested)**." 

234 ), 

235 ] 

236 

237 email: Annotated[ 

238 str, 

239 Field( 

240 regex=r"^[\S]+@[\S]+\.[\S]{2,}$", description="The person's email address." 

241 ), 

242 ] 

243 

244 family_names: Annotated[ 

245 str, Field(alias="family-names", description="The person's family names.") 

246 ] 

247 given_names: Annotated[ 

248 str, Field(alias="given-names", description="The person's given names.") 

249 ] 

250 

251 name_particle: Annotated[ 

252 Optional[str], 

253 Field( 

254 alias="name-particle", 

255 description="The person's name particle, e.g., a nobiliary particle or a preposition meaning 'of' or 'from' (for example 'von' in 'Alexander von Humboldt').", 

256 examples=["von"], 

257 ), 

258 ] 

259 name_suffix: Annotated[ 

260 Optional[str], 

261 Field( 

262 alias="name-suffix", 

263 description="The person's name-suffix, e.g. 'Jr.' for Sammy Davis Jr. or 'III' for Frank Edwin Wright III.", 

264 examples=["Jr.", "III"], 

265 ), 

266 ] 

267 alias: Annotated[Optional[str], Field(description="The person's alias.")] 

268 

269 affiliation: Annotated[ 

270 Optional[str], Field(description="The person's affiliation.") 

271 ] 

272 

273 address: Annotated[Optional[str], Field(description="The person's address.")] 

274 city: Annotated[Optional[str], Field(description="The person's city.")] 

275 country: Annotated[Optional[Country], Field(description="The person's country.")] 

276 fax: Annotated[Optional[str], Field(description="The person's fax number.")] 

277 post_code: Annotated[ 

278 Optional[str], Field(alias="post-code", description="The person's post-code.") 

279 ] 

280 region: Annotated[Optional[str], Field(description="The person's region.")] 

281 tel: Annotated[Optional[str], Field(description="The person's phone number.")] 

282 

283 # ---- 

284 # somesy-specific extensions 

285 author: Annotated[ 

286 bool, 

287 Field( 

288 description="Indicates whether the person is an author of the project (i.e. for citations)." 

289 ), 

290 ] = False 

291 maintainer: Annotated[ 

292 bool, 

293 Field( 

294 description="Indicates whether the person is a maintainer of the project (i.e. for contact)." 

295 ), 

296 ] = False 

297 

298 # NOTE: CFF 1.3 (once done) might provide ways for refined contributor description. That should be implemented here. 

299 contribution: Annotated[ 

300 Optional[str], 

301 Field(description="Summary of how the person contributed to the project."), 

302 ] 

303 contribution_types: Annotated[ 

304 Optional[List[ContributionTypeEnum]], 

305 Field( 

306 description="Relevant types of contributions (see https://allcontributors.org/docs/de/emoji-key).", 

307 min_items=1, 

308 ), 

309 ] 

310 contribution_begin: Annotated[ 

311 Optional[date], Field(description="Beginning date of the contribution.") 

312 ] 

313 contribution_end: Annotated[ 

314 Optional[date], Field(description="Ending date of the contribution.") 

315 ] 

316 

317 # helper methods 

318 

319 @property 

320 def full_name(self) -> str: 

321 """Return the full name of the person.""" 

322 names = [] 

323 

324 if self.given_names: 

325 names.append(self.given_names) 

326 

327 if self.name_particle: 

328 names.append(self.name_particle) 

329 

330 if self.family_names: 

331 names.append(self.family_names) 

332 

333 if self.name_suffix: 

334 names.append(self.name_suffix) 

335 

336 return " ".join(names) if names else "" 

337 

338 def same_person(self, other) -> bool: 

339 """Return whether two Person metadata records are about the same real person. 

340 

341 Uses heuristic match based on orcid, email and name (whichever are provided). 

342 """ 

343 if self.orcid is not None and other.orcid is not None: 

344 # having orcids is the best case, a real identifier 

345 return self.orcid == other.orcid 

346 

347 # otherwise, try to match according to mail/name 

348 # sourcery skip: merge-nested-ifs 

349 if self.email is not None and other.email is not None: 

350 if self.email == other.email: 

351 # an email address belongs to exactly one person 

352 # => same email -> same person 

353 return True 

354 # otherwise, need to check name 

355 # (a person often has multiple email addresses) 

356 

357 # no orcids, no/distinct email address 

358 # -> decide based on full_name (which is always present) 

359 return self.full_name == other.full_name 

360 

361 

362class ProjectMetadata(SomesyBaseModel): 

363 """Pydantic model for Project Metadata Input.""" 

364 

365 class Config: 

366 """Pydantic config.""" 

367 

368 extra = Extra.ignore 

369 

370 @validator("people") 

371 def ensure_distinct_people(cls, people): 

372 """Make sure that no person is listed twice in the same person list.""" 

373 for i in range(len(people)): 

374 for j in range(i + 1, len(people)): 

375 if people[i].same_person(people[j]): 

376 p1 = pretty_repr(json.loads(people[i].json())) 

377 p2 = pretty_repr(json.loads(people[j].json())) 

378 msg = f"Same person is listed twice:\n{p1}\n{p2}" 

379 raise ValueError(msg) 

380 return people 

381 

382 @validator("people") 

383 def at_least_one_author(cls, people): 

384 """Make sure there is at least one author.""" 

385 if not any(map(lambda p: p.author, people)): 

386 raise ValueError("At least one person must be an author of this project.") 

387 return people 

388 

389 name: Annotated[str, Field(description="Project name.")] 

390 description: Annotated[str, Field(description="Project description.")] 

391 version: Annotated[Optional[str], Field(description="Project version.")] 

392 license: Annotated[LicenseEnum, Field(description="SPDX License string.")] 

393 

394 repository: Annotated[ 

395 Optional[AnyUrl], 

396 Field(description="URL of the project source code repository."), 

397 ] = None 

398 homepage: Annotated[ 

399 Optional[AnyUrl], Field(description="URL of the project homepage.") 

400 ] = None 

401 

402 keywords: Annotated[ 

403 Optional[List[str]], 

404 Field(min_items=1, description="Keywords that describe the project."), 

405 ] = None 

406 

407 people: Annotated[ 

408 List[Person], 

409 Field( 

410 min_items=1, description="Project authors, maintainers and contributors." 

411 ), 

412 ] 

413 

414 def authors(self): 

415 """Return people marked as authors.""" 

416 return [p for p in self.people if p.author] 

417 

418 def maintainers(self): 

419 """Return people marked as maintainers.""" 

420 return [p for p in self.people if p.maintainer] 

421 

422 

423class SomesyInput(SomesyBaseModel): 

424 """The complete somesy input file (`somesy.toml`) or section (`pyproject.toml`).""" 

425 

426 _origin: Optional[Path] 

427 

428 project: Annotated[ 

429 ProjectMetadata, 

430 Field(description="Project metadata to be used and synchronized."), 

431 ] 

432 config: Annotated[ 

433 Optional[SomesyConfig], 

434 Field(description="somesy tool configuration (matches CLI flags)."), 

435 ] 

436 

437 def is_somesy_file(self) -> bool: 

438 """Return whether this somesy input is from a somesy config file. 

439 

440 That means, returns False if it is from pyproject.toml or package.json. 

441 """ 

442 return self.is_somesy_file_path(self._origin or Path(".")) 

443 

444 @classmethod 

445 def is_somesy_file_path(cls, path: Path) -> bool: 

446 """Return whether the path looks like a somesy config file. 

447 

448 That means, returns False if it is e.g. pyproject.toml or package.json. 

449 """ 

450 return str(path).endswith("somesy.toml") 

451 

452 @classmethod 

453 def from_input_file(cls, path: Path) -> SomesyInput: 

454 """Load somesy input from given file.""" 

455 content = get_input_content(path) 

456 ret = SomesyInput(**content) 

457 ret._origin = path 

458 return ret