From a5168fa7e539a177f498b068caa275480c13502d Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Mon, 10 May 2021 16:50:47 +0200 Subject: [PATCH 1/3] Add documentation for handling rebalance --- README.md | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3fffee58..ff392e87 100644 --- a/README.md +++ b/README.md @@ -33,15 +33,81 @@ val metadata: IO[RecordMetadata] = producer.use { producer => ## Consumer usage example ```scala +import cats.data.{NonEmptySet => Nes} + val consumer = Consumer.of[IO, String, String](config, ecBlocking) val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => for { - _ <- consumer.subscribe(Nel("topic"), None) + _ <- consumer.subscribe(Nes("topic")) records <- consumer.poll(100.millis) } yield records } ``` +## Handling consumer group rebalance +It's possible to provide a callback for a consumer group rebalance event, which can be useful if you want to do some computations, +save the state, commit some offsets (or do anything with the consumer before the new group is applied). +This can be done by providing an implementation of `RebalanceListener1` (or a more convenient version - `RebalanceListener1WithConsumer`) +which requires you to return a `RebalanceCallback` structure which describes what actions should be performed in a certain situation. +It allows you to use some of consumer's methods as well as a way to run an arbitrary computation. +Please note that all the actions are executed on the consumer `poll` thread which means that running heavy or +long-running computations is discouraged. + +What you can currently do: +- lift a pure value via `RebalanceCallback.pure(a)` +- raise an error via `RebalanceCallback.fromTry(Failure(error))` +- perform some consumer-related functionality via exposed methods like `RebalanceCallback.commit` or `RebalanceCallback.seek` + (see `RebalanceCallbackApi` to discover these methods) +- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)` + +These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance. +### Example +```scala +import cats.data.{NonEmptySet => Nes} + +class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1WithConsumer[F] { + + // import is needed to use `fa.lift` syntax where + // `fa: F[A]` + // `fa.lift: RebalanceCallback[F, A]` + import RebalanceCallback.syntax._ + + def onPartitionsAssigned(partitions: Nes[TopicPartition]) = + for { + // read the offsets from an external store using some custom code not described here + offsets <- readOffsetsFromExternalStore[F](partitions).lift + a <- offsets.toList.foldMapM { case (partition, offset) => consumer.seek(partition, offset) } + } yield a + + def onPartitionsRevoked(partitions: Nes[TopicPartition]) = + for { + positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) { + case (offsets, partition) => + for { + position <- consumer.position(partition) + } yield offsets + (partition -> position) + } + // save the offsets in an external store using some custom code not described here + a <- saveOffsetsInExternalStore[F](positions).lift + } yield a + + // do not need to save the offsets since these partitions are probably owned by other consumers already + def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + + private def readOffsetsFromExternalStore[F[_]: Applicative](partitions: Nes[TopicPartition]): F[Map[TopicPartition, Offset]] = ??? + private def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]): F[Unit] = ??? + } + +val consumer = Consumer.of[IO, String, String](config, ecBlocking) +val listener = new SaveOffsetsOnRebalance[IO] +val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => + for { + _ <- consumer.subscribe(Nes("topic"), listener) + records <- consumer.poll(100.millis) + } yield records +} +``` + ## Setup ```scala From 19692c10b06d557422eabacf67badc9386115446 Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Mon, 17 May 2021 13:37:46 +0200 Subject: [PATCH 2/3] Improve the docs --- README.md | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ff392e87..742bdcf0 100644 --- a/README.md +++ b/README.md @@ -46,18 +46,23 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => ## Handling consumer group rebalance It's possible to provide a callback for a consumer group rebalance event, which can be useful if you want to do some computations, -save the state, commit some offsets (or do anything with the consumer before the new group is applied). +save the state, commit some offsets (or do anything with the consumer whenever partition assignment changes). This can be done by providing an implementation of `RebalanceListener1` (or a more convenient version - `RebalanceListener1WithConsumer`) which requires you to return a `RebalanceCallback` structure which describes what actions should be performed in a certain situation. -It allows you to use some of consumer's methods as well as a way to run an arbitrary computation. +It allows you to use some of consumer's methods as well as a way to run an arbitrary computation. +### Note on long-running computations in a rebalance callback Please note that all the actions are executed on the consumer `poll` thread which means that running heavy or -long-running computations is discouraged. +long-running computations is discouraged. This is due to to the following reasons: +- if executing callback takes too long (longer than Kafka consumer `max.poll.interval.ms` setting), the consumer will be assumed +'failed' and the group will rebalance once again. The default value is 300000 (5 minutes). You can see the official documentation [here](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_max.poll.interval.ms) +- since the callback is executed by the means of providing an instance of `ToTry` (which runs the computation synchronously in the Java callback), it dictates the timeout for the callback computation to complete. +The current default implementation for `cats.effect.IO` is 1 minute (see `ToTry#ioToTry`) -What you can currently do: +### What you can currently do: - lift a pure value via `RebalanceCallback.pure(a)` - raise an error via `RebalanceCallback.fromTry(Failure(error))` -- perform some consumer-related functionality via exposed methods like `RebalanceCallback.commit` or `RebalanceCallback.seek` - (see `RebalanceCallbackApi` to discover these methods) +- use consumer methods, for example `RebalanceCallback.commit` or `RebalanceCallback.seek` + (see `RebalanceCallbackApi` to discover allowed consumer methods) - lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)` These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance. From 0102ea45787c6fe32a4e60ce3800adec4cc6685e Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Thu, 20 May 2021 17:53:56 +0200 Subject: [PATCH 3/3] Add description of using cats instances --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 742bdcf0..bb3cb149 100644 --- a/README.md +++ b/README.md @@ -59,11 +59,12 @@ long-running computations is discouraged. This is due to to the following reason The current default implementation for `cats.effect.IO` is 1 minute (see `ToTry#ioToTry`) ### What you can currently do: -- lift a pure value via `RebalanceCallback.pure(a)` -- raise an error via `RebalanceCallback.fromTry(Failure(error))` +- lift a pure value via `RebalanceCallback.pure(a)`. There's also an instance of `cats.Monad` for `RebalanceCallback` which you can use via syntax extensions or direct summoning +- raise an error which should be an instance of `Throwable`. If your effect `F[_]` has an instance of `MonadError[F, Throwable]` in scope, then an instance of `MonadError[RebalanceCallback[F, *], Throwable]` will be derived, so you can use it to `raiseError` via syntax extensions or direct summoning - use consumer methods, for example `RebalanceCallback.commit` or `RebalanceCallback.seek` (see `RebalanceCallbackApi` to discover allowed consumer methods) -- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)` +- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)` +- handle occuring errors in the `F[_]` effect via `callback.handleErroWith(...)` or using `MonadError` instance, described above These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance. ### Example