Skip to content

Rabbit mq

RabbitMQBulkOutput

Bases: BulkOutput

Uses a RabbitMQ Queue <https://www.rabbitmq.com/>_ as a destination for parent generation messages.

Plugin name: rabbitmq_out_bulk

Example Configuration

.. code-block:: yaml

- name: rabbitmq_bulk
  conf:
    connection:
      host: my-rabbit-server.co.uk
      user: user
      password: '*********'
      vhost: my_virtual_host
      kwargs:
        heartbeat: 300
    exchange:
      name: mydest-exchange
      type: fanout
      routing_key: asset
    cache_max_size: 10
Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class RabbitMQBulkOutput(BulkOutput):
    """
    Uses a `RabbitMQ Queue <https://www.rabbitmq.com/>`_ as a destination for parent
    generation messages.

    **Plugin name:** ``rabbitmq_out_bulk``

    Example Configuration:
        .. code-block:: yaml

            - name: rabbitmq_bulk
              conf:
                connection:
                  host: my-rabbit-server.co.uk
                  user: user
                  password: '*********'
                  vhost: my_virtual_host
                  kwargs:
                    heartbeat: 300
                exchange:
                  name: mydest-exchange
                  type: fanout
                  routing_key: asset
                cache_max_size: 10
    """

    config_class = RabbitMQConf

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # Create the credentials object
        credentials = pika.PlainCredentials(
            self.conf.connection.user, self.conf.connection.password
        )

        # Start the rabbitMQ connection
        rabbit_connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.conf.connection.host,
                credentials=credentials,
                virtual_host=self.conf.connection.vhost,
                **self.conf.connection.kwargs,
            )
        )

        # Create a new channel
        self.channel = rabbit_connection.channel()
        self.channel.exchange_declare(
            exchange=self.conf.exchange.name,
            exchange_type=self.conf.exchange.type,
        )

    def data_to_cache(self, data: dict) -> None:
        """
        Convert the data into a data to  be stored in cache.

        :param data: data from processor to be output.
        :param kwargs:
        """
        return {
            data["body"][f"{data['surtype']}_id"]: {
                f"{data['surtype']}_id": data["body"][f"{data['surtype']}_id"],
                "uri": data["uri"],
            }
        }

    def export(self, data_list: list) -> None:
        """
        Export the data to rabbit.

        :param data: expected data as header dict
        """

        self.channel.basic_publish(
            exchange=self.conf.exchange.name,
            body=json.dumps(data_list),
            routing_key=self.conf.exchange.routing_key,
        )

data_to_cache(data)

Convert the data into a data to be stored in cache.

:param data: data from processor to be output. :param kwargs:

Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def data_to_cache(self, data: dict) -> None:
    """
    Convert the data into a data to  be stored in cache.

    :param data: data from processor to be output.
    :param kwargs:
    """
    return {
        data["body"][f"{data['surtype']}_id"]: {
            f"{data['surtype']}_id": data["body"][f"{data['surtype']}_id"],
            "uri": data["uri"],
        }
    }

export(data_list)

Export the data to rabbit.

:param data: expected data as header dict

Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
125
126
127
128
129
130
131
132
133
134
135
136
def export(self, data_list: list) -> None:
    """
    Export the data to rabbit.

    :param data: expected data as header dict
    """

    self.channel.basic_publish(
        exchange=self.conf.exchange.name,
        body=json.dumps(data_list),
        routing_key=self.conf.exchange.routing_key,
    )

RabbitMQConf

Bases: BulkOutputConf

RabbitMQ config model.

Parameters:

Name Type Description Default
connection RabbitMQConnection

RabbitMQ connection kwargs.

{}
exchange RabbitMQExchange

RabbitMQ exchange info.

required
Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
46
47
48
49
50
51
52
53
54
55
class RabbitMQConf(BulkOutputConf):
    """RabbitMQ config model."""

    connection: RabbitMQConnection = Field(
        default={},
        description="RabbitMQ connection kwargs.",
    )
    exchange: RabbitMQExchange = Field(
        description="RabbitMQ exchange info.",
    )

RabbitMQConnection

Bases: BaseModel

JSON config model.

Parameters:

Name Type Description Default
user str

RabbitMQ user.

required
password str

RabbitMQ password.

required
host str

RabbitMQ vhost.

required
kwargs dict

RabbitMQ additional kwargs.

{}
Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class RabbitMQConnection(BaseModel):
    """JSON config model."""

    user: str = Field(
        description="RabbitMQ user.",
    )
    password: str = Field(
        description="RabbitMQ password.",
    )
    host: str = Field(
        description="RabbitMQ host.",
    )
    host: str = Field(
        description="RabbitMQ vhost.",
    )
    kwargs: dict = Field(
        default={},
        description="RabbitMQ additional kwargs.",
    )

RabbitMQExchange

Bases: BaseModel

JSON config model.

Parameters:

Name Type Description Default
name str

RabbitMQ exchange name.

required
type str

RabbitMQ exchange type.

required
routing_key str

RabbitMQ exchange routing key.

''
Source code in stac_generator/plugins/bulk_outputs/rabbit_mq.py
31
32
33
34
35
36
37
38
39
40
41
42
43
class RabbitMQExchange(BaseModel):
    """JSON config model."""

    name: str = Field(
        description="RabbitMQ exchange name.",
    )
    type: str = Field(
        description="RabbitMQ exchange type.",
    )
    routing_key: str = Field(
        default="",
        description="RabbitMQ exchange routing key.",
    )