From dba7533720e92126c11862c0f0e196470a5187be Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 23 Oct 2024 14:57:16 +0200 Subject: [PATCH] Fix maintaining pipeline when using AMQP If RabbitMQ droped the connection, pika can emit the StreamLostError which can be gracefully handled by reconnection attempt. In addition, consuming on BlockingConnection without the timeout can block internal maintanence operations, like sending heartbeats [1]. [1] https://pika.readthedocs.io/en/1.2.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume --- CHANGELOG.md | 1 + intelmq/lib/pipeline.py | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad1ab29b0..04b7336ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Configuration ### Core +- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski). ### Development diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 2cf36f023..8a5731cf6 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu if pika is None: raise ValueError("To use AMQP you must install the 'pika' library.") self.properties = pika.BasicProperties(delivery_mode=2) # message persistence + self.heartbeat = 10 def load_configurations(self, queues_type): self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1") @@ -533,9 +534,9 @@ def load_configurations(self, queues_type): self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH)) pika_version = tuple(int(x) for x in pika.__version__.split('.')) if pika_version < (0, 11): - self.kwargs['heartbeat_interval'] = 10 + self.kwargs['heartbeat_interval'] = self.heartbeat else: - self.kwargs['heartbeat'] = 10 + self.kwargs['heartbeat'] = self.heartbeat if pika_version < (1, ): # https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q self.publish_raises_nack = False @@ -607,7 +608,10 @@ def _send(self, destination_queue, message, reconnect=True): mandatory=True, ) except Exception as exc: # UnroutableError, NackError in 1.0.0 - if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed): + if reconnect and ( + isinstance(exc, pika.exceptions.ConnectionClosed) + or isinstance(exc, pika.exceptions.StreamLostError) + ): self.logger.debug('Error sending the message. ' 'Will re-connect and re-send.', exc_info=True) @@ -645,9 +649,13 @@ def _receive(self) -> bytes: if self.source_queue is None: raise exceptions.ConfigurationError('pipeline', 'No source queue given.') try: - method, header, body = next(self.channel.consume(self.source_queue)) - if method: - self.delivery_tag = method.delivery_tag + method, body = None, None + while not (method or body): + method, _, body = next( + self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2) + ) + if method: + self.delivery_tag = method.delivery_tag except Exception as exc: raise exceptions.PipelineError(exc) else: