Skip to content

Elasticsearch

ElasticsearchConf

Bases: BaseModel

Elasticsearch config model.

Source code in stac_generator/plugins/outputs/elasticsearch.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ElasticsearchConf(BaseModel):
    """Elasticsearch config model."""

    index: ElasticsearchIndex = Field(
        description="Elasticsearch index to post to.",
    )
    client_kwargs: dict = Field(
        default={},
        description="Elasticsearch connection kwargs.",
    )
    request_timeout: int = Field(
        default=60,
        description="Request timeout for search.",
    )

ElasticsearchIndex

Bases: BaseModel

Elasticsearch index model.

Source code in stac_generator/plugins/outputs/elasticsearch.py
15
16
17
18
19
20
21
22
23
24
class ElasticsearchIndex(BaseModel):
    """Elasticsearch index model."""

    name: str = Field(
        description="Name of index.",
    )
    mapping: str | dict = Field(
        default={},
        description="Index initial mapping.",
    )

ElasticsearchOutput

Bases: Output

Output generated meta data to elasticsearch.

Plugin name: elasticsearch

Example Configuration

.. code-block:: yaml

- name: elasticsearch
    conf:
      client_kwargs:
        hosts: ['host1','host2']
      index:
        name: 'assets-2021-06-02'
Source code in stac_generator/plugins/outputs/elasticsearch.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ElasticsearchOutput(Output):
    """
    Output generated meta data to elasticsearch.

    **Plugin name:** ``elasticsearch``

    Example Configuration:
        .. code-block:: yaml

            - name: elasticsearch
                conf:
                  client_kwargs:
                    hosts: ['host1','host2']
                  index:
                    name: 'assets-2021-06-02'
    """

    config_class = ElasticsearchConf

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.es = Elasticsearch(**self.conf.client_kwargs)

        # Create the index, if it doesn't already exist
        if mapping := self.conf.index.mapping:
            if not self.es.indices.exists(self.conf.index.name):
                if isinstance(mapping, str):
                    mapping = load_yaml(mapping)
                self.es.indices.create(self.conf.index.name, body=mapping)

    def export(self, data: dict, **kwargs) -> None:

        self.es.update(
            index=self.conf.index.name,
            id=data["id"],
            body={"doc": data, "doc_as_upsert": True},
            request_timeout=self.conf.request_timeout,
        )