Skip to content

Rabbit mq

RabbitMQConf

Bases: BaseModel

RabbitMQ config model.

Parameters:

Name Type Description Default
connection RabbitMQConnection

RabbitMQ connection kwargs.

required
exchange RabbitMQExchange

RabbitMQ exchange info.

required
Source code in stac_generator/plugins/outputs/rabbit_mq.py
45
46
47
48
49
50
51
52
53
class RabbitMQConf(BaseModel):
    """RabbitMQ config model."""

    connection: RabbitMQConnection = Field(
        description="RabbitMQ connection kwargs.",
    )
    exchange: RabbitMQExchange = Field(
        description="RabbitMQ exchange info.",
    )

RabbitMQConnection

Bases: BaseModel

JSON config model.

Parameters:

Name Type Description Default
user str

RabbitMQ user.

required
password str

RabbitMQ password.

required
host str

RabbitMQ host.

required
vhost str

RabbitMQ vhost.

required
kwargs dict

RabbitMQ additional kwargs.

{}
Source code in stac_generator/plugins/outputs/rabbit_mq.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RabbitMQConnection(BaseModel):
    """JSON config model."""

    user: str = Field(
        description="RabbitMQ user.",
    )
    password: str = Field(
        description="RabbitMQ password.",
    )
    host: str = Field(
        description="RabbitMQ host.",
    )
    vhost: str = Field(
        description="RabbitMQ vhost.",
    )
    kwargs: dict = Field(
        default={},
        description="RabbitMQ additional kwargs.",
    )

RabbitMQExchange

Bases: BaseModel

JSON config model.

Parameters:

Name Type Description Default
name str

RabbitMQ exchange name.

required
type str

RabbitMQ exchange type.

required
routing_key str

RabbitMQ exchange routing key.

''
Source code in stac_generator/plugins/outputs/rabbit_mq.py
30
31
32
33
34
35
36
37
38
39
40
41
42
class RabbitMQExchange(BaseModel):
    """JSON config model."""

    name: str = Field(
        description="RabbitMQ exchange name.",
    )
    type: str = Field(
        description="RabbitMQ exchange type.",
    )
    routing_key: str = Field(
        default="",
        description="RabbitMQ exchange routing key.",
    )

RabbitMQOutput

Bases: Output

Output to a RabbitMQ Queue <https://www.rabbitmq.com/>_.

Plugin name: rabbitmq_out

Example Configuration

.. code-block:: yaml

- name: rabbitmq
  conf:
    connection:
      host: my-rabbit-server.co.uk
      user: user
      password: '*********'
      vhost: my_virtual_host
      kwargs:
        heartbeat: 300
    exchange:
      name: mydest-exchange
      type: fanout
      routing_key: asset
Source code in stac_generator/plugins/outputs/rabbit_mq.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class RabbitMQOutput(Output):
    """
    Output to a `RabbitMQ Queue <https://www.rabbitmq.com/>`_.

    **Plugin name:** ``rabbitmq_out``

    Example Configuration:
        .. code-block:: yaml

            - name: rabbitmq
              conf:
                connection:
                  host: my-rabbit-server.co.uk
                  user: user
                  password: '*********'
                  vhost: my_virtual_host
                  kwargs:
                    heartbeat: 300
                exchange:
                  name: mydest-exchange
                  type: fanout
                  routing_key: asset
    """

    config_class = RabbitMQConf

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # Create the credentials object
        credentials = pika.PlainCredentials(
            self.conf.connection.user, self.conf.connection.password
        )

        # Start the rabbitMQ connection
        rabbit_connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.conf.connection.host,
                credentials=credentials,
                virtual_host=self.conf.connection.vhost,
                **self.conf.connection.kwargs,
            )
        )

        # Create a new channel
        self.channel = rabbit_connection.channel()
        self.channel.exchange_declare(
            exchange=self.conf.exchange.name,
            exchange_type=self.conf.exchange.type,
        )

    def export(self, data: dict, **kwargs) -> None:
        """
        Export the data to rabbit.

        :param data: expected data as header dict
        """

        message = {
            f"{data['surtype'].value}_id": data[f"{data['surtype'].value}_id"],
            "uri": data["uri"],
        }

        self.channel.basic_publish(
            exchange=self.conf.exchange.name,
            body=json.dumps(message),
            routing_key=self.conf.exchange.routing_key,
        )

export(data, **kwargs)

Export the data to rabbit.

:param data: expected data as header dict

Source code in stac_generator/plugins/outputs/rabbit_mq.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def export(self, data: dict, **kwargs) -> None:
    """
    Export the data to rabbit.

    :param data: expected data as header dict
    """

    message = {
        f"{data['surtype'].value}_id": data[f"{data['surtype'].value}_id"],
        "uri": data["uri"],
    }

    self.channel.basic_publish(
        exchange=self.conf.exchange.name,
        body=json.dumps(message),
        routing_key=self.conf.exchange.routing_key,
    )