Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Confluent Kafka Message Key Avro Serializer Support #220

Open
scottkeller opened this issue May 17, 2018 · 5 comments
Open

Confluent Kafka Message Key Avro Serializer Support #220

scottkeller opened this issue May 17, 2018 · 5 comments

Comments

@scottkeller
Copy link

When operating in "confluent" mode for Kafka, divolte only serializes the message value with Confluent's special 5 byte message prefix (AKA 1 byte "Magic Byte" + 4 byte schema id.) When using the Confluent Kafka consumer, the deserialization method expects the message key to have the same format as the message, and throws an error if it does not.

I would propose to add an additional option to the configuration that would allow the message key to also be serialized in the Confluent specific format.

This could be done in the "mappings" section of the divolte configuration file with an option called "confluent_key_id" to specify the message key schema. If that property is present, it could then be able to use the same serializerFactory method that is used for the message when creating the Kafka Producer.

@asnare
Copy link
Member

asnare commented May 17, 2018

That's… interesting.

During the original implementation for this the topic of encoding the keys came up. I must confess we didn't realise the Confluent platform requires the keys be encoded as well.

The easy bit in trying to solve this is the ID of the schema; that can be configured in a similar way. The hard bits are:

  • What is the schema?
  • What do we map into it?

So far we've been highly-opinionated on the topic of keys: it's the party id, and opaque. There's no configurability in that, and nor is any intended. The simplest possible answers to the questions would then be:

  • A fixed schema with a string for the party id.
  • Filling the record is hard-coded.

This would be a bit annoying for users, since they'd have to manually register this fixed schema in order to configure its id. This would harm the out-of-the-box experience and also violates the 'bring your own schema' philosophy.

Stepping back a bit, can you elaborate a bit more on how the keys are used in the Confluent platform in practice?

@scottkeller
Copy link
Author

It appears that the key schema is most likely a string value for most use cases. However, the key should not be static and be configurable per Kafka Producer and Consumer as you can drive the message partition mapping off of the message key.

I don't think the intention for the key schema is for it to be a complex record type, rather just for it to be a configurable simple data type. See the below excerpt from https://docs.confluent.io/1.0/schema-registry/docs/serializer-formatter.html

`In the following example, we read the value of the messages in JSON.

bin/kafka-avro-console-consumer --topic t1
--zookeeper localhost:2181

You should see following in the console.
{"f1": "value1"}
In the following example, we send strings and Avro records in JSON as the key and the value of the message, respectively.

bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t2
--property parse.key=true
--property key.schema='{"type":"string"}'
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

In the shell, type in the following.
"key1" \t {"f1": "value1"}`

In this example, they are specifying the key schema directly as a string data type. However, they also show in their documentation how you would register a "key" schema and a "value" schema in the Schema Registry. https://docs.confluent.io/current/schema-registry/docs/intro.html#quickstart

I first encountered this problem using the confluent-kafka python client. The problem I encountered was that the confluent-kafka AvroConsumer expects the key to also be registered with the schema registry. The relevant excerpt from their code:

message = super(AvroConsumer, self).poll(timeout) if message is None: return None if not message.value() and not message.key(): return message if not message.error(): if message.value() is not None: decoded_value = self._serializer.decode_message(message.value()) message.set_value(decoded_value) if message.key() is not None: decoded_key = self._serializer.decode_message(message.key()) message.set_key(decoded_key) return message

Where decode_message checks for the magic byte followed by the 4 byte schema id

You could argue that their AvroConsumer should be more tolerant and allow an arbitrary key that does not need to be registered. However, given that when using the Confluent Kafka Schema Registry the default example is to also register a corresponding key schema, I believe that the producer should have a way to implement this as well.

@asnare
Copy link
Member

asnare commented May 21, 2018

@friso
Copy link
Collaborator

friso commented May 22, 2018

Upstream solution is proposed here: confluentinc/confluent-kafka-python#211

I don't know why there's no PR for it, as it doesn't seem very complicated. Solving this on the Python consumer side would be preferable to introducing the complexity of adding key schemas in Divolte. Message keys are for distribution and identification purposes. They're not data and not intended to be interpreted by application code.

@OneCricketeer
Copy link

I think my workaround of extracting the deserializer function from the AvroConsumer class and calling it directly is reasonable.

That gives you the most flexibility to deserialize the events.

Ref confluentinc/confluent-kafka-python#428 (comment)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants