Skip to content

Commit

Permalink
Add init_kafka_producer
Browse files Browse the repository at this point in the history
This code, originally from sqrbot-jr, sets up an AIOKafkaProducer in a
cleanup context. It integrates the kafka_ssl_context work.
  • Loading branch information
jonathansick committed Mar 11, 2020
1 parent a76af6c commit f84f178
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ exclude = '''
include_trailing_comma = true
multi_line_output = 3
known_first_party = "safir"
known_third_party = ["aiohttp", "kafkit", "pytest", "setuptools", "structlog"]
known_third_party = ["aiohttp", "aiokafka", "kafkit", "pytest", "setuptools", "structlog"]
skip = ["docs/conf.py"]
61 changes: 60 additions & 1 deletion src/safir/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

import structlog
from aiokafka import AIOKafkaProducer
from kafkit.ssl import concatenate_certificates, create_ssl_context

__all__ = ["configure_kafka_ssl"]
__all__ = ["configure_kafka_ssl", "init_kafka_producer"]

if TYPE_CHECKING:
from typing import AsyncGenerator
Expand Down Expand Up @@ -83,3 +85,60 @@ async def configure_kafka_ssl(app: Application) -> AsyncGenerator:
logger.info("Created Kafka SSL context")

yield


async def init_kafka_producer(app: Application) -> AsyncGenerator:
"""Initialize and cleanup the aiokafka producer instance.
Parameters
----------
app : `aiohttp.web.Application`
The aiohttp.web-based application. This app *must* include a standard
configuration object at the ``"safir/config"`` key. The config must
have these attributes:
``logger_name``
Name of the application's logger.
``kafka_broker_url``
The URL of a Kafka broker.
``kafka_protocol``
The protocol for Kafka broker communication.
Additionally, `configure_kafka_ssl` must be applied **before** this
initializer so that ``safir/kafka_ssl_context`` is set on the
application.
Notes
-----
This initializer adds an `aiokafka.AIOKafkaProducer` instance to the
``app`` under the ``safir/kafka_producer`` key.
Examples
--------
Use this function as a `cleanup context
<https://aiohttp.readthedocs.io/en/stable/web_reference.html#aiohttp.web.Application.cleanup_ctx>`__.
To access the producer:
.. code-block:: python
producer = app["safir/kafka_producer"]
"""
# Startup phase
logger = structlog.get_logger(app["safir/config"].logger_name)
logger.info("Starting Kafka producer")
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers=app["safir/config"].kafka_broker_url,
ssl_context=app["safir/kafka_ssl_context"],
security_protocol=app["safir/kafka_protocol"],
)
await producer.start()
app["safir/kafka_producer"] = producer
logger.info("Finished starting Kafka producer")

yield

# Cleanup phase
logger.info("Shutting down Kafka producer")
await producer.stop()

0 comments on commit f84f178

Please sign in to comment.