Skip to content

Rabbit mq

RabbitMQConf

Bases: BaseModel

RabbitMQ config model.

Parameters:

Name Type Description Default
connection RabbitMQConnection

RabbitMQ connection kwargs.

required
exchange RabbitMQExchange

RabbitMQ exchange info.

required
queues list[RabbitMQQueue]

RabbitMQ queues to bind.

[]
uri_term str

Attritube to use as uri.

'uri'
extra_terms list[KeyOutputKey]

List of extra attributes.

[]
Source code in stac_generator/plugins/inputs/rabbit_mq.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class RabbitMQConf(BaseModel):
    """RabbitMQ config model."""

    connection: RabbitMQConnection = Field(
        description="RabbitMQ connection kwargs.",
    )
    exchange: RabbitMQExchange = Field(
        description="RabbitMQ exchange info.",
    )
    queues: list[RabbitMQQueue] = Field(
        default=[],
        description="RabbitMQ queues to bind.",
    )
    uri_term: str = Field(description="Attritube to use as uri.", default="uri")
    extra_terms: list[KeyOutputKey] = Field(
        default=[],
        description="List of extra attributes.",
    )

RabbitMQConnection

Bases: BaseModel

RabbitMQ Connection model.

Parameters:

Name Type Description Default
user str

RabbitMQ user.

required
password str

RabbitMQ password.

required
host str

RabbitMQ host.

required
vhost str

RabbitMQ vhost.

required
kwargs dict

RabbitMQ additional kwargs.

{}
Source code in stac_generator/plugins/inputs/rabbit_mq.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RabbitMQConnection(BaseModel):
    """RabbitMQ Connection model."""

    user: str = Field(
        description="RabbitMQ user.",
    )
    password: str = Field(
        description="RabbitMQ password.",
    )
    host: str = Field(
        description="RabbitMQ host.",
    )
    vhost: str = Field(
        description="RabbitMQ vhost.",
    )
    kwargs: dict = Field(
        default={},
        description="RabbitMQ additional kwargs.",
    )

RabbitMQExchange

Bases: BaseModel

RabbitMQ Exchange model.

Parameters:

Name Type Description Default
name str

RabbitMQ exchange name.

required
type str

RabbitMQ exchange type.

'topic'
kwargs dict

RabbitMQ exchange kwargs.

{}
Source code in stac_generator/plugins/inputs/rabbit_mq.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class RabbitMQExchange(BaseModel):
    """RabbitMQ Exchange model."""

    name: str = Field(
        description="RabbitMQ exchange name.",
    )
    type: str = Field(
        default="topic",
        description="RabbitMQ exchange type.",
    )
    kwargs: dict = Field(
        default={},
        description="RabbitMQ exchange kwargs.",
    )

RabbitMQInput

Bases: BlockingInput

Uses a RabbitMQ Queue <https://www.rabbitmq.com/>_ as a source for events.

Plugin name: rabbitmq

Example Configuration

.. code-block:: yaml

name: rabbitmq
conf:
  connection:
    host: my-rabbit-server.co.uk
    user: user
    password: '*********'
    vhost: my_virtual_host
    kwargs:
      heartbeat: 300
  exchange:
    source_exchange:
      name: mysource-exchange
      type: fanout
    destination_exchange:
      name: mydest-exchange
      type: fanout
  queues:
    - kwargs:
        durable: true
      bind_kwargs:
        routing_key: my.routing.key
      consume_kwargs:
        auto_ack: false
Source code in stac_generator/plugins/inputs/rabbit_mq.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class RabbitMQInput(BlockingInput):
    """
    Uses a `RabbitMQ Queue <https://www.rabbitmq.com/>`_ as a source for events.

    **Plugin name:** ``rabbitmq``

    Example Configuration:
        .. code-block:: yaml

            name: rabbitmq
            conf:
              connection:
                host: my-rabbit-server.co.uk
                user: user
                password: '*********'
                vhost: my_virtual_host
                kwargs:
                  heartbeat: 300
              exchange:
                source_exchange:
                  name: mysource-exchange
                  type: fanout
                destination_exchange:
                  name: mydest-exchange
                  type: fanout
              queues:
                - kwargs:
                    durable: true
                  bind_kwargs:
                    routing_key: my.routing.key
                  consume_kwargs:
                    auto_ack: false

    """

    config_class = RabbitMQConf

    @staticmethod
    def decode_message(body: bytes) -> dict:
        """
        Takes the message and turns into a dictionary.
        String message format when split on :
            date_hour = split_line[0]
            min = split_line[1]
            sec = split_line[2]
            path = split_line[3]
            action = split_line[4]
            filesize = split_line[5]
            message = ":".join(split_line[6:])

        :param body: Message body, either a json string or text

        """

        # Decode the byte string to utf-8
        body = body.decode("utf-8")

        LOGGER.info("RabbitMQ message recieved: %s", body)

        try:
            msg = json.loads(body)

        except json.JSONDecodeError:
            try:
                msg = ast.literal_eval(body)

            except (ValueError, SyntaxError):
                # Assume the message is in the old format and split on :
                split_line = body.strip().split(":")

                msg = {
                    "datetime": ":".join(split_line[:3]),
                    "uri": split_line[3],
                    "action": split_line[4],
                    "filesize": split_line[5],
                    "message": ":".join(split_line[6:]),
                }

        if "uri" not in msg:
            msg["uri"] = msg["filepath"]

        return msg

    def _connect(self) -> pika.channel.Channel:
        """
        Start Pika connection to server. This is run in each thread.

        :return: pika channel
        """

        # Create the credentials object
        credentials = pika.PlainCredentials(
            self.conf.connection.user, self.conf.connection.password
        )

        # Start the rabbitMQ connection
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.conf.connection.host,
                credentials=credentials,
                virtual_host=self.conf.connection.vhost,
                **self.conf.connection.kwargs,
            )
        )

        # Create a new channel
        channel = connection.channel()

        channel.exchange_declare(
            exchange=self.conf.exchange.name,
            exchange_type=self.conf.exchange.type,
            **self.conf.exchange.kwargs,
        )
        channel.basic_qos(prefetch_count=1)

        # Declare queue and bind queue to the dest exchange
        for queue in self.conf.queues:
            channel.queue_declare(queue=queue.name, **queue.declare_kwargs)

            channel.queue_bind(
                exchange=self.conf.exchange.name, queue=queue.name, **queue.bind_kwargs
            )

            # Set callback
            callback = functools.partial(self.callback, connection=connection)
            channel.basic_consume(
                queue=queue.name, on_message_callback=callback, **queue.consume_kwargs
            )

        return channel

    @staticmethod
    def _acknowledge_message(channel: pika.channel.Channel, delivery_tag: str):
        """
        Acknowledge message

        :param channel: Channel which message came from
        :param delivery_tag: Message id
        """

        LOGGER.debug("Acknowledging message: %s", delivery_tag)
        if channel.is_open:
            channel.basic_ack(delivery_tag)

    def acknowledge_message(
        self,
        channel: pika.channel.Channel,
        delivery_tag: str,
        connection: pika.connection.Connection,
    ):
        """
        Acknowledge message and move onto the next. All of the required
        params come from the message callback params.

        :param channel: callback channel param
        :param delivery_tag: from the callback method param. eg. method.delivery_tag
        :param connection: connection object from the callback param
        """
        cb = functools.partial(self._acknowledge_message, channel, delivery_tag)
        connection.add_callback_threadsafe(cb)

    def callback(
        self,
        ch: pika.channel.Channel,
        method: pika.frame.Method,
        properties: pika.frame.Header,
        body: bytes,
        connection: pika.connection.Connection,
    ) -> None:

        # Get message
        try:
            message = self.decode_message(body)

        except IndexError:
            # Acknowledge message if the message is not compliant
            LOGGER.error("Unable to decode input message: %s", body)
            self.acknowledge_message(ch, method.delivery_tag, connection)
            return

        # Extract uri
        output = {"uri": message[self.conf.uri_term]}

        for extra_term in self.conf.extra_terms:
            output[extra_term.output_key] = message[extra_term.key]

        LOGGER.info("Input processing: %s message: %s", message[self.conf.uri_term], message)

        self.process_method(output)
        self.acknowledge_message(ch, method.delivery_tag, connection)

    def run(self, process_method: Callable):

        self.process_method = process_method

        while True:
            channel = self._connect()

            try:
                LOGGER.info("READY")
                channel.start_consuming()

            except KeyboardInterrupt:
                channel.stop_consuming()
                break

            except pika.exceptions.StreamLostError as e:
                # Log problem
                LOGGER.error("Connection lost, reconnecting", exc_info=e)
                continue

            except Exception as e:
                LOGGER.critical(e, exc_info=True)

                channel.stop_consuming()
                break

acknowledge_message(channel, delivery_tag, connection)

Acknowledge message and move onto the next. All of the required params come from the message callback params.

:param channel: callback channel param :param delivery_tag: from the callback method param. eg. method.delivery_tag :param connection: connection object from the callback param

Source code in stac_generator/plugins/inputs/rabbit_mq.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def acknowledge_message(
    self,
    channel: pika.channel.Channel,
    delivery_tag: str,
    connection: pika.connection.Connection,
):
    """
    Acknowledge message and move onto the next. All of the required
    params come from the message callback params.

    :param channel: callback channel param
    :param delivery_tag: from the callback method param. eg. method.delivery_tag
    :param connection: connection object from the callback param
    """
    cb = functools.partial(self._acknowledge_message, channel, delivery_tag)
    connection.add_callback_threadsafe(cb)

decode_message(body) staticmethod

Takes the message and turns into a dictionary. String message format when split on : date_hour = split_line[0] min = split_line[1] sec = split_line[2] path = split_line[3] action = split_line[4] filesize = split_line[5] message = ":".join(split_line[6:])

:param body: Message body, either a json string or text

Source code in stac_generator/plugins/inputs/rabbit_mq.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@staticmethod
def decode_message(body: bytes) -> dict:
    """
    Takes the message and turns into a dictionary.
    String message format when split on :
        date_hour = split_line[0]
        min = split_line[1]
        sec = split_line[2]
        path = split_line[3]
        action = split_line[4]
        filesize = split_line[5]
        message = ":".join(split_line[6:])

    :param body: Message body, either a json string or text

    """

    # Decode the byte string to utf-8
    body = body.decode("utf-8")

    LOGGER.info("RabbitMQ message recieved: %s", body)

    try:
        msg = json.loads(body)

    except json.JSONDecodeError:
        try:
            msg = ast.literal_eval(body)

        except (ValueError, SyntaxError):
            # Assume the message is in the old format and split on :
            split_line = body.strip().split(":")

            msg = {
                "datetime": ":".join(split_line[:3]),
                "uri": split_line[3],
                "action": split_line[4],
                "filesize": split_line[5],
                "message": ":".join(split_line[6:]),
            }

    if "uri" not in msg:
        msg["uri"] = msg["filepath"]

    return msg

RabbitMQQueue

Bases: BaseModel

RabbitMQ Queue model.

Parameters:

Name Type Description Default
name str

RabbitMQ queue name.

required
declare_kwargs dict

RabbitMQ declare kwargs.

{}
bind_kwargs dict

RabbitMQ bind kwargs.

{}
consume_kwargs dict

RabbitMQ consume kwargs.

{}
Source code in stac_generator/plugins/inputs/rabbit_mq.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class RabbitMQQueue(BaseModel):
    """RabbitMQ Queue model."""

    name: str = Field(
        description="RabbitMQ queue name.",
    )
    declare_kwargs: dict = Field(
        default={},
        description="RabbitMQ declare kwargs.",
    )
    bind_kwargs: dict = Field(
        default={},
        description="RabbitMQ bind kwargs.",
    )
    consume_kwargs: dict = Field(
        default={},
        description="RabbitMQ consume kwargs.",
    )