Skip to content

Elasticsearch

ElasticsearchAssets

Bases: Backend

Using an ID. Generate a summary of information for higher level entities.

Method name: elasticsearch

Example Configuration

.. code-block:: yaml

- method: elasticsearch
  inputs:
    index: ceda-index
    href_term: item_id
    body:
      query:
        bool:
          must:
            - regexp:
              "path.keyword":
                value: $data_regex
            - exists:
              field: md5
          must_not:
            - exists:
              field: removed
    client_kwargs:
        hosts: ['host1:9200','host2:9200']
Source code in extraction_methods/plugins/assets/backends/elasticsearch.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ElasticsearchAssets(Backend):
    """
    Using an ID. Generate a summary of information for higher level entities.

    **Method name:** ``elasticsearch``

    Example Configuration:
        .. code-block:: yaml

            - method: elasticsearch
              inputs:
                index: ceda-index
                href_term: item_id
                body:
                  query:
                    bool:
                      must:
                        - regexp:
                          "path.keyword":
                            value: $data_regex
                        - exists:
                          field: md5
                      must_not:
                        - exists:
                          field: removed
                client_kwargs:
                    hosts: ['host1:9200','host2:9200']
    """

    input_class = ElasticsearchAssetsInput

    @update_input
    def run(self, body: dict[str, Any]) -> Iterator[dict[str, Any]]:

        es = Elasticsearch_client(**self.input.client_kwargs)

        # Run search
        result = es.search(
            index=self.input.index,
            body=self.input.body,
            timeout=f"{self.input.request_timeout}s",
        )

        for hit in result["hits"]["hits"]:
            source = hit["_source"]
            source["href"] = source.pop(self.input.href_term)

            yield source

ElasticsearchAssetsInput

Bases: Input

Model for Elasticsearch Assets Backend Input.

Parameters:

Name Type Description Default
index str

Elasticsearch index to search on.

required
client_kwargs dict[str, Any]

Elasticsearch connection kwargs.

{}
request_timeout int

Request timeout for search.

60
body dict[str, Any]

Body for Elasticsearch search request.

required
href_term str

term to use for href.

'path'
Source code in extraction_methods/plugins/assets/backends/elasticsearch.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ElasticsearchAssetsInput(Input):
    """
    Model for Elasticsearch Assets Backend Input.
    """

    index: str = Field(
        description="Elasticsearch index to search on.",
    )
    client_kwargs: dict[str, Any] = Field(
        default={},
        description="Elasticsearch connection kwargs.",
    )
    request_timeout: int = Field(
        default=60,
        description="Request timeout for search.",
    )
    body: dict[str, Any] = Field(
        description="Body for Elasticsearch search request.",
    )
    href_term: str = Field(
        default="path",
        description="term to use for href.",
    )