Skip to content

Elasticsearch aggregation

ElasticsearchAggregationExtract

Bases: ExtractionMethod

Using an ID. Generate a summary of information for higher level entities.

Method name: elasticsearch_aggregation

Example Configuration

.. code-block:: yaml

- method: elasticsearch_aggregation
  inputs:
    index: ceda-index
    id_term: item_id
    client_kwargs:
      hosts: ['host1:9200','host2:9200']
    bbox:
      - bbox
    min:
      - start_time
    max:
      - end_time
    sum:
      - size
    list:
      - term1
      - term2
Source code in extraction_methods/plugins/elasticsearch_aggregation.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class ElasticsearchAggregationExtract(ExtractionMethod):
    """
    Using an ID. Generate a summary of information for higher level entities.

    **Method name:** ``elasticsearch_aggregation``

    Example Configuration:
        .. code-block:: yaml

            - method: elasticsearch_aggregation
              inputs:
                index: ceda-index
                id_term: item_id
                client_kwargs:
                  hosts: ['host1:9200','host2:9200']
                bbox:
                  - bbox
                min:
                  - start_time
                max:
                  - end_time
                sum:
                  - size
                list:
                  - term1
                  - term2
    """

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.es = Elasticsearch(**self.input.client_kwargs)

    @staticmethod
    def basic_aggregation(agg_type: str, facet: KeyOutputKey) -> dict[str, Any]:
        """
        Query to retrieve the minimum value from docs.

        :param agg_type: type of aggregation
        :type agg_type: str
        :param facet: facet to aggregate
        :type facet: KeyOutputKey

        :return: basic aggregation query
        :rtype: dict
        """
        return {facet.key: {agg_type: {"field": facet.key}}}

    @staticmethod
    def facet_composite_aggregation(facet: KeyOutputKey) -> dict[str, Any]:
        """
        Generate the composite aggregation for the facet.

        :param facet: facet to aggregate
        :type facet: KeyOutputKey

        :return: composite aggregation query
        :rtype: dict
        """
        return {
            facet.key: {
                "composite": {
                    "sources": [{facet.key: {"terms": {"field": facet.key}}}],
                    "size": 100,
                }
            }
        }

    def extract_facet(self, aggregations: dict[str, Any], facet: KeyOutputKey) -> Any:
        """
        Function to extract the given facets from the aggregation.

        :param input_dict: aggregations
        :type input_dict: dict
        :param facet: facet to be extracted
        :type body: KeyOutputKey

        :return: extracted facet
        :rtype: Any
        """
        if aggregation := aggregations.get(facet.key):

            if facet_value := aggregation.get("value_as_string"):
                return facet_value

            if facet_value := aggregation.get("bounds"):
                return facet_value

            if facet_value := aggregation.get("value"):
                return facet_value

    def extract_first_facet(
        self, properties: dict[str, Any], facet: KeyOutputKey
    ) -> Any:
        """
        Function to extract the given default facets from the first hit.

        :param properties: properties from first record
        :type properties: dict
        :param facet: current facet to be extracted
        :type KeyOutputKey: dict

        :return: extracted facet
        :rtype: Any
        """
        if facet_value := properties.get(facet.key):
            return facet_value

    def extract_facet_lists(
        self,
        query: dict[str, Any],
        aggregations: dict[str, Any],
        facets: list[KeyOutputKey],
    ) -> dict[str, Any]:
        """
        Function to extract the lists of given facets from the aggregation.

        :param query: attribute dictionary to update
        :type query: dict
        :param aggregations: current generated properties
        :type aggregations: dict
        :param facets: facets to be extracted
        :type facets: list

        :return: extracted list facets
        :rtype: dict
        """
        output = defaultdict(list)
        base_query = self.base_query()

        while True:
            next_query = self.base_query()
            for facet in facets:
                if aggregation := aggregations.get(facet.key):
                    output[facet.output_key].extend(
                        [bucket["key"][facet.key] for bucket in aggregation["buckets"]]
                    )

                    if hasattr(aggregation, "after_key"):
                        next_query["aggs"] |= query["aggs"][facet.key]
                        next_query["aggs"][facet.key]["composite"]["sources"][
                            "after"
                        ] = {facet.key: aggregation["after_key"][facet.key]}

            if next_query == base_query:
                break

            result = self.es.search(index=self.input.index, body=next_query)
            aggregations = result["aggregations"]

        return output

    def base_query(self) -> dict[str, Any]:
        """
        Base query to filter the results to a single collection.

        :return: base query
        :rtype: dict
        """
        return {
            "query": self.input.search_query,
            "aggs": {},
            "size": 1,
        }

    def construct_query(self) -> dict[str, Any]:
        """
        Function to create the initial elasticsearch query.

        :return: aggregation query
        :rtype: dict
        """
        query = self.base_query()

        for bbox_term in self.input.bbox:
            query["aggs"].update(self.basic_aggregation("geo_bounds", bbox_term))

        for min_term in self.input.min:
            query["aggs"].update(self.basic_aggregation("min", min_term))

        for max_term in self.input.max:
            query["aggs"].update(self.basic_aggregation("max", max_term))

        for sum_term in self.input.sum:
            query["aggs"].update(self.basic_aggregation("sum", sum_term))

        for bucket_term in self.input.bucket:
            query["aggs"].update(self.facet_composite_aggregation(bucket_term))

        return query

    def extract_metadata(
        self, query: dict[str, Any], result: dict[str, Any]
    ) -> dict[str, Any]:
        """
        Function to extract the required metadata from the returned query result.

        :param query: previous query
        :type query: dict
        :param result: resutls from previous query
        :type result: dict

        :return: metadata
        :rtype: dict
        """
        output = {}

        properties = result["hits"]["hits"][0]["_source"]["properties"]
        aggregations = result["aggregations"]

        for facet in self.input.first:
            if facet_value := self.extract_first_facet(properties, facet):
                output[facet.output_key] = facet_value

        for facet in (
            self.input.geo_bounds + self.input.min + self.input.max + self.input.sum
        ):
            if facet_value := self.extract_facet(aggregations, facet):
                output[facet.output_key] = facet_value

        list_output = self.extract_facet_lists(query, aggregations, self.input.bucket)

        output |= list_output

        return output

    def run(self, body: dict[str, Any]) -> dict[str, Any]:

        query = self.construct_query()

        LOGGER.info("Querying Elasticsearch: %s", query)

        # Run query
        result = self.es.search(
            index=self.input.index, body=query, timeout=f"{self.input.request_tiemout}s"
        )

        # Extract metadata
        output = self.extract_metadata(query, result)

        return body | output

base_query()

Base query to filter the results to a single collection.

:return: base query :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
240
241
242
243
244
245
246
247
248
249
250
251
def base_query(self) -> dict[str, Any]:
    """
    Base query to filter the results to a single collection.

    :return: base query
    :rtype: dict
    """
    return {
        "query": self.input.search_query,
        "aggs": {},
        "size": 1,
    }

basic_aggregation(agg_type, facet) staticmethod

Query to retrieve the minimum value from docs.

:param agg_type: type of aggregation :type agg_type: str :param facet: facet to aggregate :type facet: KeyOutputKey

:return: basic aggregation query :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@staticmethod
def basic_aggregation(agg_type: str, facet: KeyOutputKey) -> dict[str, Any]:
    """
    Query to retrieve the minimum value from docs.

    :param agg_type: type of aggregation
    :type agg_type: str
    :param facet: facet to aggregate
    :type facet: KeyOutputKey

    :return: basic aggregation query
    :rtype: dict
    """
    return {facet.key: {agg_type: {"field": facet.key}}}

construct_query()

Function to create the initial elasticsearch query.

:return: aggregation query :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def construct_query(self) -> dict[str, Any]:
    """
    Function to create the initial elasticsearch query.

    :return: aggregation query
    :rtype: dict
    """
    query = self.base_query()

    for bbox_term in self.input.bbox:
        query["aggs"].update(self.basic_aggregation("geo_bounds", bbox_term))

    for min_term in self.input.min:
        query["aggs"].update(self.basic_aggregation("min", min_term))

    for max_term in self.input.max:
        query["aggs"].update(self.basic_aggregation("max", max_term))

    for sum_term in self.input.sum:
        query["aggs"].update(self.basic_aggregation("sum", sum_term))

    for bucket_term in self.input.bucket:
        query["aggs"].update(self.facet_composite_aggregation(bucket_term))

    return query

extract_facet(aggregations, facet)

Function to extract the given facets from the aggregation.

:param input_dict: aggregations :type input_dict: dict :param facet: facet to be extracted :type body: KeyOutputKey

:return: extracted facet :rtype: Any

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def extract_facet(self, aggregations: dict[str, Any], facet: KeyOutputKey) -> Any:
    """
    Function to extract the given facets from the aggregation.

    :param input_dict: aggregations
    :type input_dict: dict
    :param facet: facet to be extracted
    :type body: KeyOutputKey

    :return: extracted facet
    :rtype: Any
    """
    if aggregation := aggregations.get(facet.key):

        if facet_value := aggregation.get("value_as_string"):
            return facet_value

        if facet_value := aggregation.get("bounds"):
            return facet_value

        if facet_value := aggregation.get("value"):
            return facet_value

extract_facet_lists(query, aggregations, facets)

Function to extract the lists of given facets from the aggregation.

:param query: attribute dictionary to update :type query: dict :param aggregations: current generated properties :type aggregations: dict :param facets: facets to be extracted :type facets: list

:return: extracted list facets :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def extract_facet_lists(
    self,
    query: dict[str, Any],
    aggregations: dict[str, Any],
    facets: list[KeyOutputKey],
) -> dict[str, Any]:
    """
    Function to extract the lists of given facets from the aggregation.

    :param query: attribute dictionary to update
    :type query: dict
    :param aggregations: current generated properties
    :type aggregations: dict
    :param facets: facets to be extracted
    :type facets: list

    :return: extracted list facets
    :rtype: dict
    """
    output = defaultdict(list)
    base_query = self.base_query()

    while True:
        next_query = self.base_query()
        for facet in facets:
            if aggregation := aggregations.get(facet.key):
                output[facet.output_key].extend(
                    [bucket["key"][facet.key] for bucket in aggregation["buckets"]]
                )

                if hasattr(aggregation, "after_key"):
                    next_query["aggs"] |= query["aggs"][facet.key]
                    next_query["aggs"][facet.key]["composite"]["sources"][
                        "after"
                    ] = {facet.key: aggregation["after_key"][facet.key]}

        if next_query == base_query:
            break

        result = self.es.search(index=self.input.index, body=next_query)
        aggregations = result["aggregations"]

    return output

extract_first_facet(properties, facet)

Function to extract the given default facets from the first hit.

:param properties: properties from first record :type properties: dict :param facet: current facet to be extracted :type KeyOutputKey: dict

:return: extracted facet :rtype: Any

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def extract_first_facet(
    self, properties: dict[str, Any], facet: KeyOutputKey
) -> Any:
    """
    Function to extract the given default facets from the first hit.

    :param properties: properties from first record
    :type properties: dict
    :param facet: current facet to be extracted
    :type KeyOutputKey: dict

    :return: extracted facet
    :rtype: Any
    """
    if facet_value := properties.get(facet.key):
        return facet_value

extract_metadata(query, result)

Function to extract the required metadata from the returned query result.

:param query: previous query :type query: dict :param result: resutls from previous query :type result: dict

:return: metadata :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def extract_metadata(
    self, query: dict[str, Any], result: dict[str, Any]
) -> dict[str, Any]:
    """
    Function to extract the required metadata from the returned query result.

    :param query: previous query
    :type query: dict
    :param result: resutls from previous query
    :type result: dict

    :return: metadata
    :rtype: dict
    """
    output = {}

    properties = result["hits"]["hits"][0]["_source"]["properties"]
    aggregations = result["aggregations"]

    for facet in self.input.first:
        if facet_value := self.extract_first_facet(properties, facet):
            output[facet.output_key] = facet_value

    for facet in (
        self.input.geo_bounds + self.input.min + self.input.max + self.input.sum
    ):
        if facet_value := self.extract_facet(aggregations, facet):
            output[facet.output_key] = facet_value

    list_output = self.extract_facet_lists(query, aggregations, self.input.bucket)

    output |= list_output

    return output

facet_composite_aggregation(facet) staticmethod

Generate the composite aggregation for the facet.

:param facet: facet to aggregate :type facet: KeyOutputKey

:return: composite aggregation query :rtype: dict

Source code in extraction_methods/plugins/elasticsearch_aggregation.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@staticmethod
def facet_composite_aggregation(facet: KeyOutputKey) -> dict[str, Any]:
    """
    Generate the composite aggregation for the facet.

    :param facet: facet to aggregate
    :type facet: KeyOutputKey

    :return: composite aggregation query
    :rtype: dict
    """
    return {
        facet.key: {
            "composite": {
                "sources": [{facet.key: {"terms": {"field": facet.key}}}],
                "size": 100,
            }
        }
    }

ElasticsearchAggregationInput

Bases: Input

Model for Elasticsearch Aggregation Input.

Parameters:

Name Type Description Default
index str

Name of the index holding the STAC entities.

required
id_term str

Term used for agregating the STAC entities.

required
client_kwargs dict[str, Any]

Parameters passed to elasticsearch client.

{}
search_query dict[str, Any]

Session parameters passed to elasticsearch client.

{'bool': {'must_not': [{'term': {'categories.keyword': {'value': 'hidden'}}}], 'must': [{'term': {'path': {'value': '$uri'}}}]}}
geo_bound list[KeyOutputKey]

list of terms for which the minimum of their aggregate should be returned.

[]
first list[KeyOutputKey]

list of terms for which the first record's value should be returned.

[]
min list[KeyOutputKey]

list of terms for which the minimum of their aggregate should be returned.

[]
max list[KeyOutputKey]

list of terms for which the maximum of their aggregate should be returned.

[]
sum list[KeyOutputKey]

list of terms for which the sum of their aggregate should be returned.

[]
mean list[KeyOutputKey]

list of terms for which the mean of their summed aggregate should be returned.

[]
bucket list[KeyOutputKey]

list of terms for which the list of their aggregate should be returned.

[]
request_tiemout int

Time out for search.

15
allow_multiple bool

True if multiple labels are allowed.

True
output_key str

key to output to.

'label'
Source code in extraction_methods/plugins/elasticsearch_aggregation.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class ElasticsearchAggregationInput(Input):
    """
    Model for Elasticsearch Aggregation Input.
    """

    index: str = Field(
        description="Name of the index holding the STAC entities.",
    )
    id_term: str = Field(
        description="Term used for agregating the STAC entities.",
    )
    client_kwargs: dict[str, Any] = Field(
        default={},
        description="Parameters passed to elasticsearch client.",
    )
    search_query: dict[str, Any] = Field(
        default={
            "bool": {
                "must_not": [{"term": {"categories.keyword": {"value": "hidden"}}}],
                "must": [{"term": {"path": {"value": "$uri"}}}],
            }
        },
        description="Session parameters passed to elasticsearch client.",
    )
    geo_bound: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the minimum of their aggregate should be returned.",
    )
    first: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the first record's value should be returned.",
    )
    min: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the minimum of their aggregate should be returned.",
    )
    max: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the maximum of their aggregate should be returned.",
    )
    sum: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the sum of their aggregate should be returned.",
    )
    mean: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the mean of their summed aggregate should be returned.",
    )
    bucket: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the list of their aggregate should be returned.",
    )
    request_tiemout: int = Field(
        default=15,
        description="Time out for search.",
    )
    allow_multiple: bool = Field(
        default=True,
        description="True if multiple labels are allowed.",
    )
    output_key: str = Field(
        default="label",
        description="key to output to.",
    )