Skip to content

Intake esm

IntakeESMConf

Bases: BaseModel

IntakeESM config.

Source code in stac_generator/plugins/inputs/intake_esm.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class IntakeESMConf(BaseModel):
    """IntakeESM config."""

    url: str = Field(
        description="URL of datastore.",
    )
    uri_term: str = Field(
        description="Attritube to use as uri.",
    )
    extra_terms: list[KeyOutputKey] = Field(
        default=[],
        description="List of extra attributes.",
    )
    skip: int = Field(
        default=-1,
        description="Number of rows to skip.",
    )
    catalog_kwargs: dict = Field(
        default={},
        description="catalog kwargs.",
    )
    search_kwargs: dict = Field(
        default={},
        description="search kwargs.",
    )

IntakeESMInput

Bases: Input

Uses an Intake catalog <https://intake.readthedocs.io/>_ as a source for events.

Plugin name: intake_esm

Example Configuration

.. code-block:: yaml

name: intake_esm
conf:
  uri: test_directory
Source code in stac_generator/plugins/inputs/intake_esm.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class IntakeESMInput(Input):
    """
    Uses an `Intake catalog <https://intake.readthedocs.io/>`_
    as a source for events.

    **Plugin name:** ``intake_esm``


    Example Configuration:
        .. code-block:: yaml

            name: intake_esm
            conf:
              uri: test_directory
    """

    config_class = IntakeESMConf

    def run(self):
        total_files = 0
        start = datetime.now()

        LOGGER.info("Opening catalog %s", self.conf.url)
        catalog = intake.open_esm_datastore(self.conf.url, **self.conf.catalog_kwargs)

        if self.conf.search_kwargs:
            LOGGER.info("Searching catalog")
            catalog = catalog.search(**self.conf.search_kwargs)

        LOGGER.info("Found %s items", len(catalog.df))

        count = 0
        for _, row in catalog.df.iterrows():
            if count > self.conf.skip:
                output = {"uri": getattr(row, self.conf.uri_term)}
                LOGGER.debug("Input processing: %s", output["uri"])

                for extra_term in self.conf.extra_terms:
                    output[extra_term.output_key] = getattr(row, extra_term.key)

                yield output
                total_files += 1

            count += 1

        end = datetime.now()
        print(f"Processed {total_files} files from {self.conf.url} in {end-start}")