Skip to content

Intake esm

IntakeESMAssets

Bases: Backend

Performs Search on intake catalog to provide a stream of assets for procesing. Uses an Intake catalog <https://intake.readthedocs.io/>_ as a source for file objects.

Method name: intake

Example Configuration

.. code-block:: yaml

  • method: intake_esm inputs: href_term: url
Source code in extraction_methods/plugins/assets/backends/intake_esm.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class IntakeESMAssets(Backend):
    """
    Performs Search on intake catalog to provide a stream of assets for procesing.
    Uses an `Intake catalog <https://intake.readthedocs.io/>`_
    as a source for file objects.

    **Method name:** ``intake``

    Example Configuration:
        .. code-block:: yaml

        - method: intake_esm
          inputs:
            href_term: url
    """

    input_class = IntakeESMAssetsInput

    @update_input
    def run(self, body: dict[str, Any]) -> Iterator[dict[str, Any]]:

        catalog = intake.open_esm_datastore(
            self.input.input_term, **self.input.datastore_kwargs
        )

        if search_kwargs := self.input.search_kwargs:
            catalog = catalog.search(**search_kwargs)

        for _, row in catalog.df.iterrows():
            if href := getattr(row, self.input.href_term):
                yield {
                    "href": href,
                }

IntakeESMAssetsInput

Bases: Input

Model for IntakeESM Assets Backend Input.

Parameters:

Name Type Description Default
input_term str

term for method to run on.

'$uri'
href_term str

term to use for href.

'path'
datastore_kwargs dict[str, Any]

kwargs to open datastore.

{}
search_kwargs dict[str, Any]

kwargs for search.

{}
Source code in extraction_methods/plugins/assets/backends/intake_esm.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class IntakeESMAssetsInput(Input):
    """
    Model for IntakeESM Assets Backend Input.
    """

    input_term: str = Field(
        default="$uri",
        description="term for method to run on.",
    )
    href_term: str = Field(
        default="path",
        description="term to use for href.",
    )
    datastore_kwargs: dict[str, Any] = Field(
        default={},
        description="kwargs to open datastore.",
    )
    search_kwargs: dict[str, Any] = Field(
        default={},
        description="kwargs for search.",
    )