Skip to content

Elasticsearch

ElasticsearchBulkOutput

Bases: BulkOutput

Outputs to elasticsearch.

Plugin name: elasticsearch_bulk

Example Configuration

.. code-block:: yaml

- name: elasticsearch_bulk
  conf:
    client_kwargs:
      hosts: ['host1','host2']
      index:
        name: 'assets-2021-06-02'
Source code in stac_generator/plugins/bulk_outputs/elasticsearch.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class ElasticsearchBulkOutput(BulkOutput):
    """
    Outputs to elasticsearch.

    **Plugin name:** ``elasticsearch_bulk``

    Example Configuration:
        .. code-block:: yaml

            - name: elasticsearch_bulk
              conf:
                client_kwargs:
                  hosts: ['host1','host2']
                  index:
                    name: 'assets-2021-06-02'

    """

    config_class = ElasticsearchConf

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.es = Elasticsearch(**self.conf.client_kwargs)

        # Create the index, if it doesn't already exist
        if mapping := self.conf.index.mapping:
            if not self.es.indices.exists(self.conf.index.name):
                if isinstance(mapping, str):
                    mapping = load_yaml(mapping)
                self.es.indices.create(self.conf.index.name, body=mapping)

    def action_iterator(self, data_list: list) -> Iterator[dict]:
        """
        Generate an iterator of elasticsearch actions.

        :param data_list: List of output data

        :returns: elasticsearch action
        """
        for data in data_list:

            yield {
                "_op_type": "update",
                "_index": self.conf.index.name,
                "_id": data["id"],
                "doc": data["body"],
                "doc_as_upsert": True,
            }

    def export(self, data_list: list) -> None:
        """
        Export using elasticsearch bulk helper.
        """
        for okay, info in streaming_bulk(self.es, self.action_iterator(data_list), yield_ok=False):
            if not okay:
                LOGGER.error(
                    "Unable to index %s: %s",
                    info["update"]["_id"],
                    info["update"]["error"],
                )

action_iterator(data_list)

Generate an iterator of elasticsearch actions.

:param data_list: List of output data

:returns: elasticsearch action

Source code in stac_generator/plugins/bulk_outputs/elasticsearch.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def action_iterator(self, data_list: list) -> Iterator[dict]:
    """
    Generate an iterator of elasticsearch actions.

    :param data_list: List of output data

    :returns: elasticsearch action
    """
    for data in data_list:

        yield {
            "_op_type": "update",
            "_index": self.conf.index.name,
            "_id": data["id"],
            "doc": data["body"],
            "doc_as_upsert": True,
        }

export(data_list)

Export using elasticsearch bulk helper.

Source code in stac_generator/plugins/bulk_outputs/elasticsearch.py
 99
100
101
102
103
104
105
106
107
108
109
def export(self, data_list: list) -> None:
    """
    Export using elasticsearch bulk helper.
    """
    for okay, info in streaming_bulk(self.es, self.action_iterator(data_list), yield_ok=False):
        if not okay:
            LOGGER.error(
                "Unable to index %s: %s",
                info["update"]["_id"],
                info["update"]["error"],
            )

ElasticsearchConf

Bases: BulkOutputConf

Elasticsearch config model.

Source code in stac_generator/plugins/bulk_outputs/elasticsearch.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ElasticsearchConf(BulkOutputConf):
    """Elasticsearch config model."""

    index: ElasticsearchIndex = Field(
        description="Elasticsearch index to post to.",
    )
    client_kwargs: dict = Field(
        default={},
        description="Elasticsearch connection kwargs.",
    )
    request_timeout: int = Field(
        default=60,
        description="Request timeout for search.",
    )

ElasticsearchIndex

Bases: BaseModel

Elasticsearch index model.

Source code in stac_generator/plugins/bulk_outputs/elasticsearch.py
21
22
23
24
25
26
27
28
29
30
class ElasticsearchIndex(BaseModel):
    """Elasticsearch index model."""

    name: str = Field(
        description="Name of index.",
    )
    mapping: str | dict = Field(
        default={},
        description="Index initial mapping.",
    )