Coverage for src/metador_core/harvester/__init__.py: 82%

139 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 09:33 +0000

1"""Plugin group for metadata harvesters.""" 

2from __future__ import annotations 

3 

4from abc import ABC, ABCMeta, abstractmethod 

5from pathlib import Path 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 ClassVar, 

11 Dict, 

12 Iterable, 

13 Set, 

14 Type, 

15 TypeVar, 

16 Union, 

17) 

18 

19from overrides import overrides 

20from pydantic import Extra, ValidationError 

21from typing_extensions import TypeAlias 

22 

23from ..plugin import interface as pg 

24from ..plugins import schemas 

25from ..schema import MetadataSchema 

26from ..schema.core import BaseModelPlus 

27from ..schema.partial import PartialFactory, PartialModel 

28from ..schema.plugins import PluginRef 

29 

30HARVESTER_GROUP_NAME = "harvester" 

31 

32 

33if TYPE_CHECKING: 

34 SchemaPluginRef: TypeAlias = PluginRef 

35else: 

36 SchemaPluginRef = schemas.PluginRef 

37 

38 

39class HarvesterPlugin(pg.PluginBase): 

40 returns: SchemaPluginRef 

41 """Schema returned by this harvester.""" 

42 

43 

44S = TypeVar("S", bound=MetadataSchema) 

45 

46 

47class HarvesterArgs(BaseModelPlus): 

48 """Base class for harvester arguments.""" 

49 

50 class Config(BaseModelPlus.Config): 

51 extra = Extra.forbid 

52 

53 

54class HarvesterArgsPartial(PartialFactory): 

55 """Base class for partial harvester arguments.""" 

56 

57 base_model = HarvesterArgs 

58 

59 

60class HarvesterMetaMixin(type): 

61 _args_partials: Dict[Type, Type] = {} 

62 

63 @property 

64 # @cache not needed, partials take care of that themselves 

65 def PartialArgs(self): 

66 """Access the partial schema based on the current schema.""" 

67 return HarvesterArgsPartial.get_partial(self.Args) 

68 

69 

70class HarvesterMeta(HarvesterMetaMixin, ABCMeta): 

71 ... 

72 

73 

74class Harvester(ABC, metaclass=HarvesterMeta): 

75 """Base class for metadata harvesters. 

76 

77 A harvester is a class that can be instantiated to extract 

78 metadata according to some schema plugin (the returned schema 

79 must be defined as the `Plugin.returns` attribute) 

80 

81 Override the inner `Harvester.Args` class with a subclass to add arguments. 

82 This works exactly like schema definition and is simply 

83 another pydantic model, so you can use both field and root 

84 validators to check whether the arguments are making sense. 

85 

86 If all you need is getting a file path, consider using 

87 `FileHarvester`, which already defines a `filepath` argument. 

88 

89 Override the `run()` method to implement the metadata harvesting 

90 based on a validated configuration located at `self.args`. 

91 """ 

92 

93 Plugin: ClassVar[HarvesterPlugin] 

94 

95 Args: TypeAlias = HarvesterArgs 

96 """Arguments to be passed to the harvester.""" 

97 

98 args: Union[Harvester.Args, HarvesterArgsPartial] 

99 

100 @property 

101 def schema(self): 

102 """Partial schema class returned by this harvester. 

103 

104 Provided for implementation convenience. 

105 """ 

106 return schemas[self.Plugin.returns.name].Partial 

107 

108 # ---- 

109 # configuring and checking harvester instances: 

110 

111 def __init__(self, **kwargs): 

112 """Initialize harvester with (partial) configuration.""" 

113 self.args = type(self).PartialArgs.parse_obj(kwargs) 

114 

115 def __call__(self, **kwargs): 

116 """Return copy of harvester with updated configuration.""" 

117 args = type(self).PartialArgs.parse_obj(kwargs) 

118 merged = self.args.merge_with(args) 

119 return type(self)(**merged.dict()) 

120 

121 def __repr__(self): 

122 return f"{type(self).__name__}(args={repr(self.args)})" 

123 

124 def harvest(self): 

125 """Check provided arguments and run the harvester. 

126 

127 Call this when a harvester is configured and it should be executed. 

128 """ 

129 try: 

130 self.args = self.args.from_partial() 

131 except ValidationError as e: 

132 hname = type(self).__name__ 

133 msg = f"{hname} configuration incomplete or invalid:\n{str(e)}" 

134 raise ValueError(msg) 

135 return self.run() 

136 

137 # ---- 

138 # to be overridden 

139 

140 @abstractmethod 

141 def run(self) -> MetadataSchema: 

142 """Do the harvesting according to instance configuration and return metadata. 

143 

144 Override this method with your custom metadata harvesting logic, based 

145 on configuration provided in `self.args`. 

146 

147 All actual, possibly time-intensive harvesting computations (accessing 

148 resources, running external code, etc.) MUST be performed in this method. 

149 

150 Ensure that your harvester does not interfere with anything else, such 

151 as other harvesters running on possibly the same data - open resources 

152 such as files in read-only mode. You SHOULD avoid creating tempfiles, or 

153 ensure that these are managed, fresh and cleaned up when you are done 

154 (e.g. if you need to run external processes that dump outputs to a 

155 file). 

156 

157 Returns: 

158 A fresh instance of type `self.schema` containing harvested metadata. 

159 """ 

160 

161 

162class FileHarvester(Harvester): 

163 """Harvester for processing a single file path. 

164 

165 The file path is not provided or set during __init__, 

166 but instead is passed during harvest_file. 

167 """ 

168 

169 class Args(Harvester.Args): 

170 filepath: Path 

171 

172 

173class MetadataLoader(FileHarvester): 

174 _partial_schema: PartialModel 

175 _sidecar_func: Callable[[Path], Path] 

176 

177 def run(self): 

178 path = self.args.filepath 

179 func = type(self).__dict__.get("_sidecar_func") 

180 used_path = func(path) 

181 if not used_path.is_file(): 

182 return self._partial_schema() 

183 return self._partial_schema.parse_file(used_path) 

184 

185 

186# ---- 

187 

188 

189def _schema_arg(obj: Union[str, Type[MetadataSchema]]) -> Type[MetadataSchema]: 

190 """Return installed schema by name or same schema as passed. 

191 

192 Ensures that passed object is in fact a valid, installed schema. 

193 """ 

194 name = obj if isinstance(obj, str) else obj.Plugin.name 

195 return schemas[name] 

196 

197 

198class PGHarvester(pg.PluginGroup[Harvester]): 

199 """Harvester plugin group interface.""" 

200 

201 class Plugin: 

202 name = HARVESTER_GROUP_NAME 

203 version = (0, 1, 0) 

204 requires = [PluginRef(group="plugingroup", name="schema", version=(0, 1, 0))] 

205 plugin_class = Harvester 

206 plugin_info_class = HarvesterPlugin 

207 

208 def __post_init__(self): 

209 self._harvesters_for: Dict[str, Set[str]] = {} 

210 

211 def plugin_deps(self, plugin): 

212 if p := plugin.Plugin.returns: 

213 return {p} 

214 

215 @overrides 

216 def check_plugin(self, ep_name: str, plugin: Type[Harvester]): 

217 hv_ref = plugin.Plugin.returns 

218 

219 schema_name = hv_ref.name 

220 schema = schemas[schema_name] 

221 if not schema: 

222 raise TypeError(f"{ep_name}: Schema '{schema_name}' not installed!") 

223 

224 inst_ref = schema.Plugin.ref() 

225 if not inst_ref.supports(hv_ref): 

226 msg = f"{ep_name}: Installed schema {inst_ref} incompatible with harvester schema {hv_ref}!" 

227 raise TypeError(msg) 

228 

229 @overrides 

230 def init_plugin(self, plugin): 

231 """Add harvester to harvester lookup table.""" 

232 h_name = plugin.Plugin.name 

233 schema = plugin.Plugin.returns 

234 if schema.name not in self._harvesters_for: 

235 self._harvesters_for[schema.name] = set() 

236 self._harvesters_for[schema.name].add(h_name) 

237 

238 def for_schema( 

239 self, 

240 schema: Union[str, MetadataSchema], 

241 *, 

242 include_children: bool = False, 

243 include_parents: bool = False, 

244 ) -> Set[str]: 

245 """List installed harvesters for the given metadata schema. 

246 

247 To extend the query to parent or child schemas, set `include_children` 

248 and/or `include_parents` accordingly. 

249 

250 Harvesters for child schemas are always compatible with the schema. 

251 (assuming correct implementation of the child schemas), 

252 

253 Harvesters for parent schemas can be incompatible in some circumstances 

254 (specifically, if the parent accepts values for some field that are 

255 forbidden in the more specialized schema). 

256 

257 Args: 

258 schema: schema name or class for which to return harvesters 

259 include_children: Also include results for installed children 

260 include_parents: Also include results for parent schemas 

261 

262 Returns: 

263 Set of harvesters. 

264 """ 

265 schema_name = _schema_arg(schema).Plugin.name 

266 ret = set(self._harvesters_for[schema_name]) 

267 if include_children: 

268 for child in schemas.children(schema_name): 

269 ret = ret.union(self.for_schema(child)) 

270 if include_parents: 

271 for parent in schemas.parent_path(schema_name)[:-1]: 

272 ret = ret.union(self.for_schema(parent)) 

273 return ret 

274 

275 

276def _harvest_source(schema: Type[S], obj: Union[Path, Harvester]) -> S: 

277 """Harvest partial metadata from the passed Harvester or Path into target schema.""" 

278 if not isinstance(obj, (Path, Harvester)): 

279 raise ValueError("Metadata source must be a Harvester or a Path!") 

280 if isinstance(obj, Path): 

281 obj = metadata_loader(schema)(filepath=obj) 

282 return obj.harvest() 

283 

284 

285def _identity(x): 

286 return x 

287 

288 

289# harvesting helpers: 

290 

291 

292def def_sidecar_func(path: Path): 

293 """Return sidecar filename for a given file path using the default convention. 

294 

295 This means, the path gets a '_meta.yaml' suffix attached. 

296 """ 

297 return Path(f"{str(path)}_meta.yaml") 

298 

299 

300def metadata_loader( 

301 schema: Type[MetadataSchema], *, use_sidecar: bool = False, sidecar_func=None 

302) -> Type[MetadataLoader]: 

303 """Return harvester for partial metadata files of specified schema. 

304 

305 Will return an empty partial schema in case the file does not exist. 

306 

307 The fields provided in the file must be valid, or an exception will be raised. 

308 This is to avoid the situation where a user intends to add or override 

309 harvester metadata, but the fields are silently ignored, leading to possible 

310 surprise and confusion. 

311 

312 By default, the returned harvester will attempt to parse the provided file. 

313 

314 Set `use_sidecar` if you intend to pass it filenames of data files, but 

315 want it to instead load metadata from a sidecar file. 

316 

317 Provide a `sidecar_func` if you want to change the used sidecar file convention. 

318 By default, `def_sidecar_func` is used to transform the path. 

319 """ 

320 used_sidecar_f = _identity 

321 if use_sidecar: 

322 used_sidecar_f = sidecar_func if sidecar_func else def_sidecar_func 

323 

324 class XLoader(MetadataLoader): 

325 _partial_schema = schema.Partial 

326 _sidecar_func = used_sidecar_f 

327 

328 name = f"{schema.__name__}Loader" 

329 XLoader.__qualname__ = XLoader.__name__ = name 

330 return XLoader 

331 

332 

333def configure(*harvesters: Union[Harvester, Type[Harvester]], **kwargs): 

334 """Given a sequence of harvesters, configure them all at once. 

335 

336 Can be used to set the same parameter in all of them easily. 

337 """ 

338 return (h(**kwargs) for h in harvesters) 

339 

340 

341def file_harvester_pipeline( 

342 *hvs: Union[FileHarvester, Type[FileHarvester]] 

343) -> Callable[[Path], Any]: 

344 """Generate a harvesting pipeline for a file. 

345 

346 Args: 

347 hvs: FileHarvester classes or pre-configured instances to use. 

348 The passed objects must be preconfigured as needed, 

349 except for fixing a filepath (it will be overwritten). 

350 

351 Returns: 

352 Function that takes a file path and will return 

353 the harvesters configured for the passed path. 

354 """ 

355 return lambda path: configure(*hvs, filepath=path) 

356 

357 

358def harvest( 

359 schema: Type[S], 

360 sources: Iterable[Union[Path, Harvester]], 

361 *, 

362 ignore_invalid: bool = False, 

363 return_partial: bool = False, 

364) -> S: 

365 """Run a harvesting pipeline and return combined results. 

366 

367 Will run the harvesters in the passed order, combining results. 

368 

369 In general you can expect that if two harvesters provide the same field, 

370 the newer value by a later harvester will overwrite an existing one from an 

371 earlier harvester, or the values are combined in a suitable way. 

372 

373 If converting some Harvester result to the desired schema fails, 

374 a `ValidationError` will be raised. To change that behaviour, 

375 set `ignore_invalid`, will make sure that suitable fields are still used, 

376 even if the given object as a whole is not fully parsable. 

377 

378 Note that `ignore_invalid` only affects the conversion AFTER running the 

379 harvesters, it does NOT affect the way the harvesters treat invalid metadata. 

380 

381 By default, it is assumed that the result of the whole pipeline can be 

382 converted into a fully valid non-partial schema instance. 

383 If this final conversion fails, an exception will be raised. 

384 To prevent this conversion, set `return_partial`. In that case, you will 

385 get the partial instance as-is and can manually call `from_partial()`. 

386 

387 Args: 

388 schema: Class of schema to be returned. 

389 sources: List of sources (Paths or Harvester instances). 

390 ignore_invalid: Whether to ignore invalid fields in Harvester outputs. 

391 return_partial: Whether to return raw partial instance (default: False). 

392 

393 Returns: 

394 Metadata object with combined results. 

395 """ 

396 

397 def cast(meta): 

398 # configure cast to ignore or not ignore invalid fields 

399 return schema.Partial.cast(meta, ignore_invalid=ignore_invalid) 

400 

401 # collect partial metadata (NOTE: in principle, this could be parallelized) 

402 results = map(lambda s: cast(_harvest_source(schema, s)), sources) 

403 

404 # accumulate collected and casted results in provided order 

405 merged = schema.Partial.merge(*results) 

406 

407 # retrieve (completed) metadata model 

408 return merged if return_partial else merged.from_partial()