Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for handling rebalance #150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,81 @@ val metadata: IO[RecordMetadata] = producer.use { producer =>
## Consumer usage example

```scala
import cats.data.{NonEmptySet => Nes}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nel("topic"), None)
_ <- consumer.subscribe(Nes("topic"))
records <- consumer.poll(100.millis)
} yield records
}
```

## Handling consumer group rebalance
It's possible to provide a callback for a consumer group rebalance event, which can be useful if you want to do some computations,
save the state, commit some offsets (or do anything with the consumer before the new group is applied).
Z1kkurat marked this conversation as resolved.
Show resolved Hide resolved
This can be done by providing an implementation of `RebalanceListener1` (or a more convenient version - `RebalanceListener1WithConsumer`)
which requires you to return a `RebalanceCallback` structure which describes what actions should be performed in a certain situation.
It allows you to use some of consumer's methods as well as a way to run an arbitrary computation.
Please note that all the actions are executed on the consumer `poll` thread which means that running heavy or
long-running computations is discouraged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heavy or long-running computations is discouraged.
I think we need to add some short explanation why is it discouraged and what are the options if it's really really neded :)

  • discouraged coz if it takes too long then kafka consumer instance can be removed from consumer group (default is 5 minutes and then consumer instance considered dead roughly speaking), but most probably before those 5 default minutes we would fail with timeout exception (default is 1 minute there) in ToTry
  • so to workaround those limitations user can adjust ToTry/max.poll.interval.ms timeouts

I realise now it's really hard to explain such important mechanics in fewer words, so mb it deservs a link to a dedicated section about ToTry/max.poll.interval.ms timeouts?

More details are available in following issues:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some clarifications - please check now, will that be enough?


What you can currently do:
- lift a pure value via `RebalanceCallback.pure(a)`
- raise an error via `RebalanceCallback.fromTry(Failure(error))`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at #146 yet, but mb we would be able to raise error via ApplicativeError after merging it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We surely would. But it would be a bit strange if this is merged before #146 and has a notion of functionality that isn't merged yet, no? If this is merged before #146 I could fix the doc in the corresponding PR

Copy link
Contributor

@nikitapecasa nikitapecasa May 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, didn't mean to create any confusion, sorry :)
I'm ok with any of the following options

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a short description regarding raising errors as well as handling. Please check if that will be enough

- perform some consumer-related functionality via exposed methods like `RebalanceCallback.commit` or `RebalanceCallback.seek`
Z1kkurat marked this conversation as resolved.
Show resolved Hide resolved
(see `RebalanceCallbackApi` to discover these methods)
Z1kkurat marked this conversation as resolved.
Show resolved Hide resolved
- lift any arbitrary computation in the `F[_]` effect via `RebalanceCallback.lift(...)`

These operations can be composed due to the presence of `map`/`flatMap` methods as well as the presence of `cats.Monad` instance.
### Example
```scala
import cats.data.{NonEmptySet => Nes}

class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1WithConsumer[F] {

// import is needed to use `fa.lift` syntax where
// `fa: F[A]`
// `fa.lift: RebalanceCallback[F, A]`
import RebalanceCallback.syntax._

def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
for {
// read the offsets from an external store using some custom code not described here
offsets <- readOffsetsFromExternalStore[F](partitions).lift
a <- offsets.toList.foldMapM { case (partition, offset) => consumer.seek(partition, offset) }
} yield a

def onPartitionsRevoked(partitions: Nes[TopicPartition]) =
for {
positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) {
case (offsets, partition) =>
for {
position <- consumer.position(partition)
} yield offsets + (partition -> position)
}
// save the offsets in an external store using some custom code not described here
a <- saveOffsetsInExternalStore[F](positions).lift
} yield a

// do not need to save the offsets since these partitions are probably owned by other consumers already
def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty

private def readOffsetsFromExternalStore[F[_]: Applicative](partitions: Nes[TopicPartition]): F[Map[TopicPartition, Offset]] = ???
private def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]): F[Unit] = ???
}

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val listener = new SaveOffsetsOnRebalance[IO]
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nes("topic"), listener)
records <- consumer.poll(100.millis)
} yield records
}
```

## Setup

```scala
Expand Down