Skip to content

Intake esm

ElasticsearchConf

Bases: BaseModel

IntakeESM config model.

Parameters:

Name Type Description Default
filepath str

Elasticsearch index to post to.

required
namespace str

Elasticsearch index to post to.

'asset'
collection str

Term to use for the JSON file name.

'collection'
description str

Term to use for the JSON file name.

''
Source code in stac_generator/plugins/outputs/intake_esm.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ElasticsearchConf(BaseModel):
    """IntakeESM config model."""

    filepath: str = Field(
        description="Elasticsearch index to post to.",
    )
    namespace: str = Field(
        default="asset",
        description="Elasticsearch index to post to.",
    )
    collection: str = Field(
        default="collection",
        description="Term to use for the JSON file name.",
    )
    description: str = Field(
        default="",
        description="Term to use for the JSON file name.",
    )

IntakeESMOutput

Bases: Output

Outputs to a Intake ESM catalog description and a zipped CSV file at a location of your choosing.

This is only to be used for testing purposes and not suitable for large scale application.

Note also that the CSV header is constructed from the first data payload processed. If there are attribute variations across files, attribute columns may not align, which will yield an invalid catalog.

Plugin name: intake_esm_out

Example Configuration

.. code-block:: yaml

- name: intake_esm_out
  conf:
    filepath: location/to/destination_files/
    collection: my_collection
    description: A long form description of the dataset catalog.
Source code in stac_generator/plugins/outputs/intake_esm.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class IntakeESMOutput(Output):
    """
    Outputs to a Intake ESM catalog description and a zipped CSV file
    at a location of your choosing.

    This is only to be used for testing purposes and not suitable for large
    scale application.

    Note also that the CSV header is constructed from the first data payload processed. If there are attribute variations
    across files, attribute columns may not align, which will yield an invalid catalog.

    **Plugin name:** ``intake_esm_out``

    Example Configuration:
        .. code-block:: yaml

            - name: intake_esm_out
              conf:
                filepath: location/to/destination_files/
                collection: my_collection
                description: A long form description of the dataset catalog.

    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        if os.path.isdir(self.conf.filepath):
            self.filepath = os.path.join(self.conf.filepath, self.conf.collection)

        self.json_path = self.filepath + ".json"
        self.csv_path = self.filepath + ".csv.gz"

    @staticmethod
    def properties(data):
        """Return list of property names.

        Note that results may vary from one item to the next.
        """
        return list(data["body"]["properties"].keys())

    @staticmethod
    def data2row(data):
        """Return list of property values."""
        return list(data["body"]["properties"].values())

    def to_intake_spec(self, data):
        """Return Intake specification file content."""

        attributes = [{"column_name": key} for key in self.properties(data)]
        ext = data["body"]["extension"]

        spec = {
            "esmcat_version": ESMCAT_VERSION,
            "id": self.conf.namespace,
            "description": self.conf.description,
            "catalog_file": self.csv_path,
            "attributes": attributes,
            "assets": {"column_name": "path", "format": ASSET_FORMAT[ext]},
        }
        return spec

    def export(self, data: dict, **kwargs) -> None:
        """Write data to disk."""
        import csv
        import gzip

        if not os.path.exists(self.json_path):
            # Create catalog spec file and CSV file with header and first data row

            # Write ESM-Collection json file
            with open(self.json_path, mode="wt") as f:
                json.dump(self.to_intake_spec(data), f)

            # Write catalog data in csv.gz format
            with gzip.open(filename=self.csv_path, mode="wt") as f:
                w = csv.writer(f)
                w.writerow(self.properties(data))
                w.writerow(self.data2row(data))

        else:
            # Append new data row to CSV file
            with gzip.open(filename=self.csv_path, mode="at") as f:
                w = csv.writer(f)
                w.writerow(self.data2row(data))

data2row(data) staticmethod

Return list of property values.

Source code in stac_generator/plugins/outputs/intake_esm.py
79
80
81
82
@staticmethod
def data2row(data):
    """Return list of property values."""
    return list(data["body"]["properties"].values())

export(data, **kwargs)

Write data to disk.

Source code in stac_generator/plugins/outputs/intake_esm.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def export(self, data: dict, **kwargs) -> None:
    """Write data to disk."""
    import csv
    import gzip

    if not os.path.exists(self.json_path):
        # Create catalog spec file and CSV file with header and first data row

        # Write ESM-Collection json file
        with open(self.json_path, mode="wt") as f:
            json.dump(self.to_intake_spec(data), f)

        # Write catalog data in csv.gz format
        with gzip.open(filename=self.csv_path, mode="wt") as f:
            w = csv.writer(f)
            w.writerow(self.properties(data))
            w.writerow(self.data2row(data))

    else:
        # Append new data row to CSV file
        with gzip.open(filename=self.csv_path, mode="at") as f:
            w = csv.writer(f)
            w.writerow(self.data2row(data))

properties(data) staticmethod

Return list of property names.

Note that results may vary from one item to the next.

Source code in stac_generator/plugins/outputs/intake_esm.py
71
72
73
74
75
76
77
@staticmethod
def properties(data):
    """Return list of property names.

    Note that results may vary from one item to the next.
    """
    return list(data["body"]["properties"].keys())

to_intake_spec(data)

Return Intake specification file content.

Source code in stac_generator/plugins/outputs/intake_esm.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def to_intake_spec(self, data):
    """Return Intake specification file content."""

    attributes = [{"column_name": key} for key in self.properties(data)]
    ext = data["body"]["extension"]

    spec = {
        "esmcat_version": ESMCAT_VERSION,
        "id": self.conf.namespace,
        "description": self.conf.description,
        "catalog_file": self.csv_path,
        "attributes": attributes,
        "assets": {"column_name": "path", "format": ASSET_FORMAT[ext]},
    }
    return spec