Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: Trying to understand the motivations for the Serializer API #853

Open
lukestephenson opened this issue May 19, 2023 · 5 comments

Comments

@lukestephenson
Copy link
Contributor

Below I'm listing a couple of things that don't feel right to me about the Serializer API. I'm not expecting anyone to go and change the API for me, I'm happy to do that. This was more to see if other contributors / users are in agreement with the points before I take the discussion further or submit a PR which isn't welcome and find out there are good reasons for the current API design. Thanks

Suggestion 1: Serializer trait rework

Currently we have:

trait Serializer[-R, -T] {
  def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]]

And note that RIO is an alias for ZIO[R, Throwable, A]

Serialization should not fail. Look at libraries like Circe and zio-json (https://zio.dev/zio-json/) - encoding to json always works, we have compiler guarantees that all data can be encoded. Obviously the same can't be said for deserialization where the inbound data is outside of our control.

Suggestion 2 - Serializers / Deserializers should be safe by default

I'm doing Scala / FP I like avoiding bugs by taking advantage of a powerful type system. A building block for me in this regards is making use of Option to avoid any nasty NPEs. Give me this first before an effect system. Anyway, the Serializer / Deserializers invoked by zio-kafka can currently pass in null because the base representation of ConsumerRecord / ProducerRecord is the standard Java implementation. It's not safe by default. For example, if I want to handle that a key / value then I have to remember to add .asOption to my Deserializer. I feel like the safe behaviour, and highlighting that Kafka doesn't guarantee the key / value are present should be the default, and if I want to acknowledge that I'm happy to discard that type safety (of having Option[V] as the default), then I tell the Deserializer it is ok by calling .valueWillAlwaysExist (or some better method name).

Even on my Serializer I have to call asOption. And this is in my nice Scala code where I represent all my models with Option if appropriate, but throw that away at the last minute on the boundary to working with Java (ie when creating the kafka ProducerRecord). Then I get a callback which can potentially fail because the type safety has been given up too early.

@erikvanoosten
Copy link
Collaborator

Discussions around zio-kafka usually take place on Discord: https://discord.com/channels/629491597070827530/629497941719121960 Would you mind placing your question there?

Re. 'Serialization should not fail', serialization might depend on external systems. For example, you may have to fetch an id from a database.

IMHO letting the kafka library do the serializing/deserializing has always felt as a kludge to me. Therefore, I always use the byte array serializer/deserializer and do the conversion in my own code. This gives me the freedom to handle failures the way I want to. Also, it makes testing easier because you don't have to mess around with the serdes interfaces of someone else.

Your points about using Option instead of a nullable value are something to be looked at indeed. Though it might be hard to change the api in a backward compatible way.

@guizmaii
Copy link
Member

Hey @lukestephenson,

I agree with your observations. I'm not a fan of the current design, either.
I tried to remove the effect system from the De/Serializer interfaces of zio-kafka but didn't manage to find an interesting design.

I'd personally love to see what you can come up with.

@lukestephenson
Copy link
Contributor Author

@erikvanoosten Thanks for the comments. I'll also kick off the Discord discussion soon.

Re. 'Serialization should not fail', serialization might depend on external systems. For example, you may have to fetch an id from a database.

I completely disagree with this statement. That is not Serialization in my opinion, that is a program. By the time we hand off the ProducerRecord to the Kafka, the only reason it should fail is because of issues with Kafka.

Though it might be hard to change the api in a backward compatible way.

Agree. What I am proposing will not be backwards compatible, nor do I think that should be a design goal. We can keep the existing API if we want, but the design goals of the API I'm proposing will not be backwards compatible.

@guizmaii Thanks as well for the feedback. I had a stab at what it could look like for the Producing side. Here is a rough idea of the API (it's not polished, just for demonstrating the idea).

What is exposed in zio-kafka could be something like:

package zio.kafka.producer

import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}
import zio._

case class ZKafkaHeader(key: String, value: Array[Byte])
case class ZProducerRecord[K,V](topic: String, key: K, value: V, headers: List[ZKafkaHeader] = List.empty, partition: Option[Integer] = None, timestamp: Option[Long] = None)

object ZProducerRecord {
  // Fairly common to publish a message without a key, so a convenience method for that.
  def apply[V](topic: String, value: V): ZProducerRecord[Option[Array[Byte]], V] = new ZProducerRecord[Option[Array[Byte]], V](topic, Option.empty[Array[Byte]], value)
}

trait ByteArrayEncoder[A] {
  def apply(a: A): Option[Array[Byte]]
}

object Extensions {
  implicit class ZProducerRecordExtensions[K,V](zProducerRecord: ZProducerRecord[K,V]) {
    def encode(implicit keyEncoder: ByteArrayEncoder[K], valueEncoder: ByteArrayEncoder[V]): ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]] = {
      zProducerRecord.copy(key = keyEncoder(zProducerRecord.key), value = valueEncoder(zProducerRecord.value))
    }
  }
}

object Encoders {
  // Provided encoders
  implicit val stringEncoder = new ByteArrayEncoder[String] {
    private val kafkaSerializer = new StringSerializer()
    override def apply(a: String): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
  }

  implicit val longEncoder: ByteArrayEncoder[Long] = new ByteArrayEncoder[Long] {
    val kafkaSerializer = new LongSerializer()
    override def apply(a: Long): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
  }

  implicit val byteArrayEncoder: ByteArrayEncoder[Array[Byte]] = (a: Array[Byte]) => Some(a)

  implicit def optionEncoder[A](implicit encoder: ByteArrayEncoder[A]): ByteArrayEncoder[Option[A]] = (a: Option[A]) => a.flatMap(encoder.apply)
}

trait ProducerProposal {
  def produceAsync(record: ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]): Task[Task[RecordMetadata]] = ???

  // IMO This makes a lot more sense than the current implementation which doesn't provide flexibility for the different
  // records in the Chunk to have different serialisation strategies, even though the Chunk could have records for many different topics
  def produceAsyncChunk(records: Chunk[ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]]): Task[Task[RecordMetadata]] = ???
}

And for an end user, a sample application might look like:

package zio.kafka.producer

import Encoders._
import Extensions._

case class ExampleModel(value: String)

object EndUserExample {

  implicit val exampleModelEncoder: ByteArrayEncoder[ExampleModel] = (a: ExampleModel) => stringEncoder.apply(a.value)

  val producer: ProducerProposal = ???

  // I don't care about encoding, I've already provided the raw bytes. No call to `encode` required.
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = None, value = Some("hello".getBytes)))

  // Message with a value only
  producer.produceAsync(ZProducerRecord(topic = "my.topic", value = "hello").encode)

  // Message with a key and value
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = "hello world").encode)

  // compacted topic with value provided
  val maybeValueIsPresent: Option[ExampleModel] = Some(ExampleModel("hello"))
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsPresent).encode)

  // compacted topic with tombstone
  val maybeValueIsATombstone: Option[ExampleModel] = None
  producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsATombstone).encode)
}

Feel free to share feedback.

@erikvanoosten
Copy link
Collaborator

Further comments from me will go to Discord.

@domdorn
Copy link

domdorn commented Jun 18, 2024

Discussions around zio-kafka usually take place on Discord: https://discord.com/channels/629491597070827530/629497941719121960 Would you mind placing your question there?

Its too bad that you do this.. I was interested in reading the rest of this discussion and its super cumbersome to do so.. first I need to have another account on another platform (ok, I have that) but then I have to search through thousands of messages to find messages that correspond to this topic which is impossible after some time has passed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants