Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to handle Unexpected connection close #599

Open
tayp1n opened this issue Nov 15, 2023 · 5 comments
Open

Impossible to handle Unexpected connection close #599

tayp1n opened this issue Nov 15, 2023 · 5 comments

Comments

@tayp1n
Copy link

tayp1n commented Nov 15, 2023

How to handle Unexpected connection close and raise an exception?

I've got the logic below:

class AMQPHandler:
    def __init__(self) -> None:
        self.connection: AbstractRobustConnection | None = None
        self.channel: AbstractChannel | None = None

    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.CorePublisherSettings

        connection = await aio_pika.connect_robust(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
            timeout=config.CONNECTION_TIMEOUT,
        )
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)

        exchange = await channel.declare_exchange(
            config.EXCHANGE_NAME,
            config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.declare_queue(name=q_name)
            await queue.bind(exchange, q_name)
            await queue.consume(self.handle_message)
            logger.info("Queue declared", extra={"queue": q_name})

        self.connection = connection
        self.channel = channel

        logger.info("AMQP handler initialized")

Now, when Rabbit dropes, connection reconnects every 5 seconds. How to make it raise an exception which I could handle and stop a micro-service?

@Alviner
Copy link
Collaborator

Alviner commented Nov 15, 2023

You can use a regular connection instead of robust one.

Also event connection.connected can be checked in healthcheck handler

@tayp1n
Copy link
Author

tayp1n commented Nov 15, 2023

What I also did is

 def on_connection_closed(self, *args, **kwargs):
        sys.exit(1)
    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.BaseMessageBrokerSettings

        conn = await aiormq.connect(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
        )
        channel = await conn.channel()

        conn.closing.add_done_callback(self.on_connection_closed)
        channel.closing.add_done_callback(self.on_connection_closed)

        await channel.exchange_declare(
            exchange=config.EXCHANGE_NAME,
            exchange_type=config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.queue_declare(queue=q_name, durable=True)
            self.queues.append(queue)

            await channel.queue_bind(exchange=config.EXCHANGE_NAME, queue=q_name)
            await channel.basic_consume(queue.queue, self.handle_message, no_ack=True)
            logger.info("Queue declared", extra={"queue": q_name})

        self.conn = conn
        self.channel = channel

        logger.info("AMQP handler initialized")
       

@Dreamsorcerer
Copy link

Your original code uses connect_robust(), i.e. the function used to create a connection that automatically reconnects...
Sounds like you don't want to use that.

@gaby
Copy link

gaby commented Mar 23, 2024

I have the same issue. With connect_robust if the connection to RabbitMQ is lost, the consumer stops getting messages after re-established.

The logs show the consumer reconnecting every 5secs, Rabbit becomes reachable again, the logs stop and no new messages show up. In the UI RabbitMQ has 0 consumer for that queue.

I'm using the exact code from here (Master/Worker) but with a dict instead of task_id: https://aio-pika.readthedocs.io/en/latest/patterns.html

@mosquito Any idea?

I'm using the aio-pika 9.4.0 with Python 3.10.

@Darsstar
Copy link
Contributor

This might be fixed by #622 in 9.4.1. It sounds similar enough to what I encountered debugging #615.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants