diff --git a/README.md b/README.md index ac4a2f8..8dd8bef 100644 --- a/README.md +++ b/README.md @@ -69,3 +69,48 @@ message Person { repeated PhoneNumber phones = 4; } ``` + +## Configurable example +`io.example.conduktor.custom.deserializers.MyConfigurableDeserializer` + +[located here](./src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala) + +This example allow to show deserializer configuration to change it's behavior. +To configure the behabor, the Deserializer check for a `output` property in it's configuration. + +### Passthrough mode : +With configuration : +```properties +output=passthrough +``` +The data on record are not de coded and returned as-is in bytes array form. + +### Config mode : +With configuration : +```properties +output=config +``` +The configuration is returned on each record deserialization. +For example with configuration +```properties +output=config +other.property=some value +``` +Will always return JSON like +```json +{ + "output": "config", + "other.property": "some value" +} +``` + +### Constant mode : + +With configuration output defined to something else other than `config` or `passthrough` and not empty like: +```properties +output=some constant output +``` +The Deserializer will always return String value like +```json +"some constant output" +``` diff --git a/build.sbt b/build.sbt index b89f14a..6052e4d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,5 @@ +import org.typelevel.scalacoptions.ScalacOptions + name := "my_custom_deserializers" version := sys.env.getOrElse("CREATED_TAG", "0.1") scalaVersion := "2.13.10" @@ -8,6 +10,10 @@ libraryDependencies ++= Seq( "com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0" ) +Compile / tpolecatExcludeOptions ++= Set( + ScalacOptions.warnNonUnitStatement, // for scalaPB gen sources +) + assembly / assemblyJarName := "plugins.jar" // ## Github Packages publish configs diff --git a/project/plugins.sbt b/project/plugins.sbt index 0c625af..9e9431b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,3 @@ -addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1") diff --git a/src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala b/src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala new file mode 100644 index 0000000..e502a5a --- /dev/null +++ b/src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala @@ -0,0 +1,37 @@ +package io.example.conduktor.custom.deserializers + +import org.apache.kafka.common.serialization.Deserializer + +import java.util +import scala.jdk.CollectionConverters.MapHasAsScala + +case object MyConfigurableDeserializerException + extends RuntimeException( + "ConfigurableDeserializer fail when its `::configure` method is called without `output` property" + ) + +sealed trait Output +final case class Constant(value: String) extends Output +final case class Config(config: util.Map[String, _]) extends Output +final case object Passthrough extends Output +final case object Unconfigured extends Output + +final class MyConfigurableDeserializer extends Deserializer[Any] { + + var output: Output = Unconfigured + + override def deserialize(topic: String, data: Array[Byte]): Any = output match { + case Constant(value) => value + case Config(config) => config + case Passthrough => data + case Unconfigured => throw MyConfigurableDeserializerException + } + + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = + configs.asScala.get("output").map(_.asInstanceOf[String]) match { + case Some("config") => output = Config(configs) + case Some("passthrough") => output = Passthrough + case Some(value) => output = Constant(value) + case None => throw MyConfigurableDeserializerException + } +}