A generic Kafka client tuned for our own usage.
Features:
- Messages are expected to be hashes and are
BSON
-serialized - Connections will be properly closed on exceptions
- Consumer will stop gracefully on
SIGTERM
- At most once consumer semantics are used
- Production via an HTTP proxy (including SSL)
require "cc/kafka"
producer = CC::Kafka::Producer.new("kafka://host:1234/topic", "client-id")
producer.send_message(foo: :bar, baz: :bat)
producer.close
consumer = CC::Kafka::Consumer.new("client-id", ["kafka://host:1234", "..."], "topic", 0)
consumer.on_message do |message|
# Given the producer above, message will be
#
# {
# "foo" => :bar,
# "baz" => :bat,
# CC::Kafka::MESSAGE_OFFSET_KEY => "topic-0-1",
# }
#
end
consumer.start
Note: the value for the MESSAGE_OFFSET_KEY
identifies the message's offset
within the given topic and partition as <topic>-<partition>-<offset>
. It can
be used by consumers to tie created data to the message that lead to it and
prevent duplicate processing.
-
CC::Kafka.offset_model
Must respond to
find_for_create!(attributes)
and return an object that responds toset(attributes)
.The
attributes
used aretopic
,partition
, andcurrent
. And the object returned fromfind_or_create!
must expose methods for each of these.A
Minidoc
-based module is included that can be included in client code for an offset model implementation that will work for many clients.class KafkaOffset < Minidoc include CC::Kafka::OffsetStorage::Minidoc end CC::Kafka.offset_model = KafkaOffset
Note: This is only necessary if using
Consumer
. -
Kafka.logger
This is optional and defaults to
Logger.new(STDOUT)
. The configured object must have the same interface as the standard Ruby logger.Example:
Kafka.logger = Rails.logger
-
Kafka.statsd
This is optional and defaults to a null object. The configured object should represent a statsd client and respond to the usual methods,
increment
,time
, etc. -
Kafka.ssl_ca_file
Path to a custom SSL Certificate Authority file.
Will result in:
http.ca_file = Kafka.ssl_ca_file
-
Kafka.ssl_pem_file
Path to a custom SSL Certificate (and key) in concatenated, PEM format.
Will result in:
pem = File.read(Kafka.ssl_pem_file) http.cert = OpenSSL::X509::Certificate.new(pem) http.key = OpenSSL::PKey::RSA.new(pem)
See LICENSE