diff --git a/CHANGELOG.md b/CHANGELOG.md index ad1ab29b0..04b7336ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Configuration ### Core +- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski). ### Development diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 2cf36f023..8a5731cf6 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu if pika is None: raise ValueError("To use AMQP you must install the 'pika' library.") self.properties = pika.BasicProperties(delivery_mode=2) # message persistence + self.heartbeat = 10 def load_configurations(self, queues_type): self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1") @@ -533,9 +534,9 @@ def load_configurations(self, queues_type): self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH)) pika_version = tuple(int(x) for x in pika.__version__.split('.')) if pika_version < (0, 11): - self.kwargs['heartbeat_interval'] = 10 + self.kwargs['heartbeat_interval'] = self.heartbeat else: - self.kwargs['heartbeat'] = 10 + self.kwargs['heartbeat'] = self.heartbeat if pika_version < (1, ): # https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q self.publish_raises_nack = False @@ -607,7 +608,10 @@ def _send(self, destination_queue, message, reconnect=True): mandatory=True, ) except Exception as exc: # UnroutableError, NackError in 1.0.0 - if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed): + if reconnect and ( + isinstance(exc, pika.exceptions.ConnectionClosed) + or isinstance(exc, pika.exceptions.StreamLostError) + ): self.logger.debug('Error sending the message. ' 'Will re-connect and re-send.', exc_info=True) @@ -645,9 +649,13 @@ def _receive(self) -> bytes: if self.source_queue is None: raise exceptions.ConfigurationError('pipeline', 'No source queue given.') try: - method, header, body = next(self.channel.consume(self.source_queue)) - if method: - self.delivery_tag = method.delivery_tag + method, body = None, None + while not (method or body): + method, _, body = next( + self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2) + ) + if method: + self.delivery_tag = method.delivery_tag except Exception as exc: raise exceptions.PipelineError(exc) else: