Skip to content

Dict aggregator

DictAggregatorExtract

Bases: ExtractionMethod

Aggregate information within dictionary.

Method name: dict_aggregator

Example Configuration

.. code-block:: yaml

- method: dict_aggregator
  inputs:
    min:
      - start_time
    max:
      - end_time
    sum:
      - size
    list:
      - term1
      - term2
Source code in extraction_methods/plugins/dict_aggregator.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class DictAggregatorExtract(ExtractionMethod):
    """
    Aggregate information within dictionary.

    **Method name:** ``dict_aggregator``

    Example Configuration:
        .. code-block:: yaml

            - method: dict_aggregator
              inputs:
                min:
                  - start_time
                max:
                  - end_time
                sum:
                  - size
                list:
                  - term1
                  - term2
    """

    input_class = DictAggregatorInput

    @update_input
    def run(self, body: dict[str, Any]) -> dict[str, Any]:

        for value in self.input.input_term.values():
            for list_term in self.input.list_terms:
                if list_term.key in value:
                    body.setdefault(list_term.output_key, []).append(
                        value[list_term.key]
                    )

            for sum_term in itertools.chain(
                self.input.sum_terms, self.input.mean_terms
            ):
                if sum_term.key in value:
                    body.setdefault(sum_term.output_key, 0)
                    body[sum_term.output_key] += value[sum_term.key]

            for min_term in self.input.min_terms:
                if min_term.key in value and (
                    min_term.output_key not in body
                    or value[min_term.key] < body[min_term.output_key]
                ):
                    body[min_term.output_key] = value[min_term.key]

            for max_term in self.input.max_terms:
                if max_term.key in value and (
                    max_term.output_key not in body
                    or value[max_term.key] < body[max_term.output_key]
                ):
                    body[max_term.output_key] = value[max_term.key]

        for mean_term in self.input.mean_terms:
            body[mean_term.output_key] /= len(self.input.input_term)

        return body

DictAggregatorInput

Bases: Input

Model for Dictionary Aggregator Method Input.

Parameters:

Name Type Description Default
input_term str | dict[str, Any]

term for method to run on.

'$assets'
min list[KeyOutputKey]

list of terms for which the minimum of their aggregate should be returned.

[]
max list[KeyOutputKey]

list of terms for which the maximum of their aggregate should be returned.

[]
sum list[KeyOutputKey]

list of terms for which the sum of their aggregate should be returned.

[]
mean list[KeyOutputKey]

list of terms for which the mean of their summed aggregate should be returned.

[]
bucket list[KeyOutputKey]

list of terms for which the list of their aggregate should be returned.

[]
Source code in extraction_methods/plugins/dict_aggregator.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class DictAggregatorInput(Input):
    """
    Model for Dictionary Aggregator Method Input.
    """

    input_term: str | dict[str, Any] = Field(
        default="$assets",
        description="term for method to run on.",
    )
    min: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the minimum of their aggregate should be returned.",
    )
    max: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the maximum of their aggregate should be returned.",
    )
    sum: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the sum of their aggregate should be returned.",
    )
    mean: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the mean of their summed aggregate should be returned.",
    )
    bucket: list[KeyOutputKey] = Field(
        default=[],
        description="list of terms for which the list of their aggregate should be returned.",
    )