Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for handling rebalance #150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,86 @@ val metadata: IO[RecordMetadata] = producer.use { producer =>
## Consumer usage example

```scala
import cats.data.{NonEmptySet => Nes}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nel("topic"), None)
_ <- consumer.subscribe(Nes("topic"))
records <- consumer.poll(100.millis)
} yield records
}
```

## Handling consumer group rebalance
It's possible to provide a callback for a consumer group rebalance event, which can be useful if you want to do some computations,
save the state, commit some offsets (or do anything with the consumer whenever partition assignment changes).
This can be done by providing an implementation of `RebalanceListener1` (or a more convenient version - `RebalanceListener1WithConsumer`)
which requires you to return a `RebalanceCallback` structure which describes what actions should be performed in a certain situation.
It allows you to use some of consumer's methods as well as a way to run an arbitrary computation.
### Note on long-running computations in a rebalance callback
Please note that all the actions are executed on the consumer `poll` thread which means that running heavy or
long-running computations is discouraged. This is due to to the following reasons:
- if executing callback takes too long (longer than Kafka consumer `max.poll.interval.ms` setting), the consumer will be assumed
'failed' and the group will rebalance once again. The default value is 300000 (5 minutes). You can see the official documentation [here](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_max.poll.interval.ms)
- since the callback is executed by the means of providing an instance of `ToTry` (which runs the computation synchronously in the Java callback), it dictates the timeout for the callback computation to complete.
The current default implementation for `cats.effect.IO` is 1 minute (see `ToTry#ioToTry`)

### What you can currently do:
- lift a pure value via `RebalanceCallback.pure(a)`
- raise an error via `RebalanceCallback.fromTry(Failure(error))`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at #146 yet, but mb we would be able to raise error via ApplicativeError after merging it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We surely would. But it would be a bit strange if this is merged before #146 and has a notion of functionality that isn't merged yet, no? If this is merged before #146 I could fix the doc in the corresponding PR

Copy link
Contributor

@nikitapecasa nikitapecasa May 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, didn't mean to create any confusion, sorry :)
I'm ok with any of the following options

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a short description regarding raising errors as well as handling. Please check if that will be enough

- use consumer methods, for example `RebalanceCallback.commit` or `RebalanceCallback.seek`
(see `RebalanceCallbackApi` to discover allowed consumer methods)
- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)`

These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance.
### Example
```scala
import cats.data.{NonEmptySet => Nes}

class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1WithConsumer[F] {

// import is needed to use `fa.lift` syntax where
// `fa: F[A]`
// `fa.lift: RebalanceCallback[F, A]`
import RebalanceCallback.syntax._

def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
for {
// read the offsets from an external store using some custom code not described here
offsets <- readOffsetsFromExternalStore[F](partitions).lift
a <- offsets.toList.foldMapM { case (partition, offset) => consumer.seek(partition, offset) }
} yield a

def onPartitionsRevoked(partitions: Nes[TopicPartition]) =
for {
positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) {
case (offsets, partition) =>
for {
position <- consumer.position(partition)
} yield offsets + (partition -> position)
}
// save the offsets in an external store using some custom code not described here
a <- saveOffsetsInExternalStore[F](positions).lift
} yield a

// do not need to save the offsets since these partitions are probably owned by other consumers already
def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty

private def readOffsetsFromExternalStore[F[_]: Applicative](partitions: Nes[TopicPartition]): F[Map[TopicPartition, Offset]] = ???
private def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]): F[Unit] = ???
}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val listener = new SaveOffsetsOnRebalance[IO]
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nes("topic"), listener)
records <- consumer.poll(100.millis)
} yield records
}
```

## Setup

```scala
Expand Down