Skip to content

Commit

Permalink
Fix maintaining pipeline when using AMQP
Browse files Browse the repository at this point in the history
If RabbitMQ droped the connection, pika can emit the StreamLostError
which can be gracefully handled by reconnection attempt. In addition,
consuming on BlockingConnection without the timeout can block internal
maintanence operations, like sending heartbeats [1].

[1] https://pika.readthedocs.io/en/1.2.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume
  • Loading branch information
kamil-certat committed Oct 23, 2024
1 parent d5e3e41 commit dba7533
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
### Configuration

### Core
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).

### Development

Expand Down
20 changes: 14 additions & 6 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu
if pika is None:
raise ValueError("To use AMQP you must install the 'pika' library.")
self.properties = pika.BasicProperties(delivery_mode=2) # message persistence
self.heartbeat = 10

def load_configurations(self, queues_type):
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1")
Expand All @@ -533,9 +534,9 @@ def load_configurations(self, queues_type):
self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH))
pika_version = tuple(int(x) for x in pika.__version__.split('.'))
if pika_version < (0, 11):
self.kwargs['heartbeat_interval'] = 10
self.kwargs['heartbeat_interval'] = self.heartbeat
else:
self.kwargs['heartbeat'] = 10
self.kwargs['heartbeat'] = self.heartbeat
if pika_version < (1, ):
# https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q
self.publish_raises_nack = False
Expand Down Expand Up @@ -607,7 +608,10 @@ def _send(self, destination_queue, message, reconnect=True):
mandatory=True,
)
except Exception as exc: # UnroutableError, NackError in 1.0.0
if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed):
if reconnect and (
isinstance(exc, pika.exceptions.ConnectionClosed)
or isinstance(exc, pika.exceptions.StreamLostError)
):
self.logger.debug('Error sending the message. '
'Will re-connect and re-send.',
exc_info=True)
Expand Down Expand Up @@ -645,9 +649,13 @@ def _receive(self) -> bytes:
if self.source_queue is None:
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
try:
method, header, body = next(self.channel.consume(self.source_queue))
if method:
self.delivery_tag = method.delivery_tag
method, body = None, None
while not (method or body):
method, _, body = next(
self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2)
)
if method:
self.delivery_tag = method.delivery_tag
except Exception as exc:
raise exceptions.PipelineError(exc)
else:
Expand Down

0 comments on commit dba7533

Please sign in to comment.