Coverage for src/metador_core/harvester/__init__.py: 82%
139 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 09:33 +0000
1"""Plugin group for metadata harvesters."""
2from __future__ import annotations
4from abc import ABC, ABCMeta, abstractmethod
5from pathlib import Path
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 ClassVar,
11 Dict,
12 Iterable,
13 Set,
14 Type,
15 TypeVar,
16 Union,
17)
19from overrides import overrides
20from pydantic import Extra, ValidationError
21from typing_extensions import TypeAlias
23from ..plugin import interface as pg
24from ..plugins import schemas
25from ..schema import MetadataSchema
26from ..schema.core import BaseModelPlus
27from ..schema.partial import PartialFactory, PartialModel
28from ..schema.plugins import PluginRef
30HARVESTER_GROUP_NAME = "harvester"
33if TYPE_CHECKING:
34 SchemaPluginRef: TypeAlias = PluginRef
35else:
36 SchemaPluginRef = schemas.PluginRef
39class HarvesterPlugin(pg.PluginBase):
40 returns: SchemaPluginRef
41 """Schema returned by this harvester."""
44S = TypeVar("S", bound=MetadataSchema)
47class HarvesterArgs(BaseModelPlus):
48 """Base class for harvester arguments."""
50 class Config(BaseModelPlus.Config):
51 extra = Extra.forbid
54class HarvesterArgsPartial(PartialFactory):
55 """Base class for partial harvester arguments."""
57 base_model = HarvesterArgs
60class HarvesterMetaMixin(type):
61 _args_partials: Dict[Type, Type] = {}
63 @property
64 # @cache not needed, partials take care of that themselves
65 def PartialArgs(self):
66 """Access the partial schema based on the current schema."""
67 return HarvesterArgsPartial.get_partial(self.Args)
70class HarvesterMeta(HarvesterMetaMixin, ABCMeta):
71 ...
74class Harvester(ABC, metaclass=HarvesterMeta):
75 """Base class for metadata harvesters.
77 A harvester is a class that can be instantiated to extract
78 metadata according to some schema plugin (the returned schema
79 must be defined as the `Plugin.returns` attribute)
81 Override the inner `Harvester.Args` class with a subclass to add arguments.
82 This works exactly like schema definition and is simply
83 another pydantic model, so you can use both field and root
84 validators to check whether the arguments are making sense.
86 If all you need is getting a file path, consider using
87 `FileHarvester`, which already defines a `filepath` argument.
89 Override the `run()` method to implement the metadata harvesting
90 based on a validated configuration located at `self.args`.
91 """
93 Plugin: ClassVar[HarvesterPlugin]
95 Args: TypeAlias = HarvesterArgs
96 """Arguments to be passed to the harvester."""
98 args: Union[Harvester.Args, HarvesterArgsPartial]
100 @property
101 def schema(self):
102 """Partial schema class returned by this harvester.
104 Provided for implementation convenience.
105 """
106 return schemas[self.Plugin.returns.name].Partial
108 # ----
109 # configuring and checking harvester instances:
111 def __init__(self, **kwargs):
112 """Initialize harvester with (partial) configuration."""
113 self.args = type(self).PartialArgs.parse_obj(kwargs)
115 def __call__(self, **kwargs):
116 """Return copy of harvester with updated configuration."""
117 args = type(self).PartialArgs.parse_obj(kwargs)
118 merged = self.args.merge_with(args)
119 return type(self)(**merged.dict())
121 def __repr__(self):
122 return f"{type(self).__name__}(args={repr(self.args)})"
124 def harvest(self):
125 """Check provided arguments and run the harvester.
127 Call this when a harvester is configured and it should be executed.
128 """
129 try:
130 self.args = self.args.from_partial()
131 except ValidationError as e:
132 hname = type(self).__name__
133 msg = f"{hname} configuration incomplete or invalid:\n{str(e)}"
134 raise ValueError(msg)
135 return self.run()
137 # ----
138 # to be overridden
140 @abstractmethod
141 def run(self) -> MetadataSchema:
142 """Do the harvesting according to instance configuration and return metadata.
144 Override this method with your custom metadata harvesting logic, based
145 on configuration provided in `self.args`.
147 All actual, possibly time-intensive harvesting computations (accessing
148 resources, running external code, etc.) MUST be performed in this method.
150 Ensure that your harvester does not interfere with anything else, such
151 as other harvesters running on possibly the same data - open resources
152 such as files in read-only mode. You SHOULD avoid creating tempfiles, or
153 ensure that these are managed, fresh and cleaned up when you are done
154 (e.g. if you need to run external processes that dump outputs to a
155 file).
157 Returns:
158 A fresh instance of type `self.schema` containing harvested metadata.
159 """
162class FileHarvester(Harvester):
163 """Harvester for processing a single file path.
165 The file path is not provided or set during __init__,
166 but instead is passed during harvest_file.
167 """
169 class Args(Harvester.Args):
170 filepath: Path
173class MetadataLoader(FileHarvester):
174 _partial_schema: PartialModel
175 _sidecar_func: Callable[[Path], Path]
177 def run(self):
178 path = self.args.filepath
179 func = type(self).__dict__.get("_sidecar_func")
180 used_path = func(path)
181 if not used_path.is_file():
182 return self._partial_schema()
183 return self._partial_schema.parse_file(used_path)
186# ----
189def _schema_arg(obj: Union[str, Type[MetadataSchema]]) -> Type[MetadataSchema]:
190 """Return installed schema by name or same schema as passed.
192 Ensures that passed object is in fact a valid, installed schema.
193 """
194 name = obj if isinstance(obj, str) else obj.Plugin.name
195 return schemas[name]
198class PGHarvester(pg.PluginGroup[Harvester]):
199 """Harvester plugin group interface."""
201 class Plugin:
202 name = HARVESTER_GROUP_NAME
203 version = (0, 1, 0)
204 requires = [PluginRef(group="plugingroup", name="schema", version=(0, 1, 0))]
205 plugin_class = Harvester
206 plugin_info_class = HarvesterPlugin
208 def __post_init__(self):
209 self._harvesters_for: Dict[str, Set[str]] = {}
211 def plugin_deps(self, plugin):
212 if p := plugin.Plugin.returns:
213 return {p}
215 @overrides
216 def check_plugin(self, ep_name: str, plugin: Type[Harvester]):
217 hv_ref = plugin.Plugin.returns
219 schema_name = hv_ref.name
220 schema = schemas[schema_name]
221 if not schema:
222 raise TypeError(f"{ep_name}: Schema '{schema_name}' not installed!")
224 inst_ref = schema.Plugin.ref()
225 if not inst_ref.supports(hv_ref):
226 msg = f"{ep_name}: Installed schema {inst_ref} incompatible with harvester schema {hv_ref}!"
227 raise TypeError(msg)
229 @overrides
230 def init_plugin(self, plugin):
231 """Add harvester to harvester lookup table."""
232 h_name = plugin.Plugin.name
233 schema = plugin.Plugin.returns
234 if schema.name not in self._harvesters_for:
235 self._harvesters_for[schema.name] = set()
236 self._harvesters_for[schema.name].add(h_name)
238 def for_schema(
239 self,
240 schema: Union[str, MetadataSchema],
241 *,
242 include_children: bool = False,
243 include_parents: bool = False,
244 ) -> Set[str]:
245 """List installed harvesters for the given metadata schema.
247 To extend the query to parent or child schemas, set `include_children`
248 and/or `include_parents` accordingly.
250 Harvesters for child schemas are always compatible with the schema.
251 (assuming correct implementation of the child schemas),
253 Harvesters for parent schemas can be incompatible in some circumstances
254 (specifically, if the parent accepts values for some field that are
255 forbidden in the more specialized schema).
257 Args:
258 schema: schema name or class for which to return harvesters
259 include_children: Also include results for installed children
260 include_parents: Also include results for parent schemas
262 Returns:
263 Set of harvesters.
264 """
265 schema_name = _schema_arg(schema).Plugin.name
266 ret = set(self._harvesters_for[schema_name])
267 if include_children:
268 for child in schemas.children(schema_name):
269 ret = ret.union(self.for_schema(child))
270 if include_parents:
271 for parent in schemas.parent_path(schema_name)[:-1]:
272 ret = ret.union(self.for_schema(parent))
273 return ret
276def _harvest_source(schema: Type[S], obj: Union[Path, Harvester]) -> S:
277 """Harvest partial metadata from the passed Harvester or Path into target schema."""
278 if not isinstance(obj, (Path, Harvester)):
279 raise ValueError("Metadata source must be a Harvester or a Path!")
280 if isinstance(obj, Path):
281 obj = metadata_loader(schema)(filepath=obj)
282 return obj.harvest()
285def _identity(x):
286 return x
289# harvesting helpers:
292def def_sidecar_func(path: Path):
293 """Return sidecar filename for a given file path using the default convention.
295 This means, the path gets a '_meta.yaml' suffix attached.
296 """
297 return Path(f"{str(path)}_meta.yaml")
300def metadata_loader(
301 schema: Type[MetadataSchema], *, use_sidecar: bool = False, sidecar_func=None
302) -> Type[MetadataLoader]:
303 """Return harvester for partial metadata files of specified schema.
305 Will return an empty partial schema in case the file does not exist.
307 The fields provided in the file must be valid, or an exception will be raised.
308 This is to avoid the situation where a user intends to add or override
309 harvester metadata, but the fields are silently ignored, leading to possible
310 surprise and confusion.
312 By default, the returned harvester will attempt to parse the provided file.
314 Set `use_sidecar` if you intend to pass it filenames of data files, but
315 want it to instead load metadata from a sidecar file.
317 Provide a `sidecar_func` if you want to change the used sidecar file convention.
318 By default, `def_sidecar_func` is used to transform the path.
319 """
320 used_sidecar_f = _identity
321 if use_sidecar:
322 used_sidecar_f = sidecar_func if sidecar_func else def_sidecar_func
324 class XLoader(MetadataLoader):
325 _partial_schema = schema.Partial
326 _sidecar_func = used_sidecar_f
328 name = f"{schema.__name__}Loader"
329 XLoader.__qualname__ = XLoader.__name__ = name
330 return XLoader
333def configure(*harvesters: Union[Harvester, Type[Harvester]], **kwargs):
334 """Given a sequence of harvesters, configure them all at once.
336 Can be used to set the same parameter in all of them easily.
337 """
338 return (h(**kwargs) for h in harvesters)
341def file_harvester_pipeline(
342 *hvs: Union[FileHarvester, Type[FileHarvester]]
343) -> Callable[[Path], Any]:
344 """Generate a harvesting pipeline for a file.
346 Args:
347 hvs: FileHarvester classes or pre-configured instances to use.
348 The passed objects must be preconfigured as needed,
349 except for fixing a filepath (it will be overwritten).
351 Returns:
352 Function that takes a file path and will return
353 the harvesters configured for the passed path.
354 """
355 return lambda path: configure(*hvs, filepath=path)
358def harvest(
359 schema: Type[S],
360 sources: Iterable[Union[Path, Harvester]],
361 *,
362 ignore_invalid: bool = False,
363 return_partial: bool = False,
364) -> S:
365 """Run a harvesting pipeline and return combined results.
367 Will run the harvesters in the passed order, combining results.
369 In general you can expect that if two harvesters provide the same field,
370 the newer value by a later harvester will overwrite an existing one from an
371 earlier harvester, or the values are combined in a suitable way.
373 If converting some Harvester result to the desired schema fails,
374 a `ValidationError` will be raised. To change that behaviour,
375 set `ignore_invalid`, will make sure that suitable fields are still used,
376 even if the given object as a whole is not fully parsable.
378 Note that `ignore_invalid` only affects the conversion AFTER running the
379 harvesters, it does NOT affect the way the harvesters treat invalid metadata.
381 By default, it is assumed that the result of the whole pipeline can be
382 converted into a fully valid non-partial schema instance.
383 If this final conversion fails, an exception will be raised.
384 To prevent this conversion, set `return_partial`. In that case, you will
385 get the partial instance as-is and can manually call `from_partial()`.
387 Args:
388 schema: Class of schema to be returned.
389 sources: List of sources (Paths or Harvester instances).
390 ignore_invalid: Whether to ignore invalid fields in Harvester outputs.
391 return_partial: Whether to return raw partial instance (default: False).
393 Returns:
394 Metadata object with combined results.
395 """
397 def cast(meta):
398 # configure cast to ignore or not ignore invalid fields
399 return schema.Partial.cast(meta, ignore_invalid=ignore_invalid)
401 # collect partial metadata (NOTE: in principle, this could be parallelized)
402 results = map(lambda s: cast(_harvest_source(schema, s)), sources)
404 # accumulate collected and casted results in provided order
405 merged = schema.Partial.merge(*results)
407 # retrieve (completed) metadata model
408 return merged if return_partial else merged.from_partial()