-
Notifications
You must be signed in to change notification settings - Fork 77
Confluent Kafka Message Key Avro Serializer Support #220
Comments
That's… interesting. During the original implementation for this the topic of encoding the keys came up. I must confess we didn't realise the Confluent platform requires the keys be encoded as well. The easy bit in trying to solve this is the ID of the schema; that can be configured in a similar way. The hard bits are:
So far we've been highly-opinionated on the topic of keys: it's the party id, and opaque. There's no configurability in that, and nor is any intended. The simplest possible answers to the questions would then be:
This would be a bit annoying for users, since they'd have to manually register this fixed schema in order to configure its id. This would harm the out-of-the-box experience and also violates the 'bring your own schema' philosophy. Stepping back a bit, can you elaborate a bit more on how the keys are used in the Confluent platform in practice? |
It appears that the key schema is most likely a string value for most use cases. However, the key should not be static and be configurable per Kafka Producer and Consumer as you can drive the message partition mapping off of the message key. I don't think the intention for the key schema is for it to be a complex record type, rather just for it to be a configurable simple data type. See the below excerpt from https://docs.confluent.io/1.0/schema-registry/docs/serializer-formatter.html `In the following example, we read the value of the messages in JSON. bin/kafka-avro-console-consumer --topic t1 You should see following in the console. bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t2 In the shell, type in the following. In this example, they are specifying the key schema directly as a string data type. However, they also show in their documentation how you would register a "key" schema and a "value" schema in the Schema Registry. https://docs.confluent.io/current/schema-registry/docs/intro.html#quickstart I first encountered this problem using the confluent-kafka python client. The problem I encountered was that the confluent-kafka AvroConsumer expects the key to also be registered with the schema registry. The relevant excerpt from their code:
Where decode_message checks for the magic byte followed by the 4 byte schema id You could argue that their AvroConsumer should be more tolerant and allow an arbitrary key that does not need to be registered. However, given that when using the Confluent Kafka Schema Registry the default example is to also register a corresponding key schema, I believe that the producer should have a way to implement this as well. |
Relevant code: |
Upstream solution is proposed here: confluentinc/confluent-kafka-python#211 I don't know why there's no PR for it, as it doesn't seem very complicated. Solving this on the Python consumer side would be preferable to introducing the complexity of adding key schemas in Divolte. Message keys are for distribution and identification purposes. They're not data and not intended to be interpreted by application code. |
I think my workaround of extracting the deserializer function from the AvroConsumer class and calling it directly is reasonable. That gives you the most flexibility to deserialize the events. |
When operating in "confluent" mode for Kafka, divolte only serializes the message value with Confluent's special 5 byte message prefix (AKA 1 byte "Magic Byte" + 4 byte schema id.) When using the Confluent Kafka consumer, the deserialization method expects the message key to have the same format as the message, and throws an error if it does not.
I would propose to add an additional option to the configuration that would allow the message key to also be serialized in the Confluent specific format.
This could be done in the "mappings" section of the divolte configuration file with an option called "confluent_key_id" to specify the message key schema. If that property is present, it could then be able to use the same serializerFactory method that is used for the message when creating the Kafka Producer.
The text was updated successfully, but these errors were encountered: