Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of a single subscription #1201

Draft
wants to merge 50 commits into
base: master
Choose a base branch
from

Conversation

svroonland
Copy link
Collaborator

@svroonland svroonland commented Mar 24, 2024

Implements functionality for gracefully stopping a stream for a single subscription: stop fetching records for the assigned topic-partitions but keep being subscribed so that offsets can still be committed. Intended to replace stopConsumption, which did not support multiple-subscription use cases.

Implements some of #941.

We should deprecate stopConsumption before releasing.

@svroonland svroonland changed the title Subscription stream control Graceful shutdown of a single subscription Mar 30, 2024
@svroonland svroonland marked this pull request as ready for review March 30, 2024 11:07
Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look at the implementation yet, only docs and tests.


## Controlled shutdown

The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, in-flight records will not be processed fully through all stream stages and offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine. zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing all in-flight messages to be fully processed.
Copy link
Collaborator

@erikvanoosten erikvanoosten Mar 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowing all in-flight messages to be fully processed.

I suggest we change these words to:

allowing commits for in-flight records to complete.

  1. Let's continue to use the word 'records'.
  2. Removed the word 'all' because when an interrupt is received, internally queued records are dropped and not passed to the consumer stream.
  3. The remaining in-flight records are already passed to the consumer so processing already commenced. The only relevant operation from zio-kafka point of view is that the records can be committed.

docs/consuming-kafka-topics-using-zio-streams.md Outdated Show resolved Hide resolved

ZIO.scoped {
for {
streamControl <- Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be layed out for smaller screens please?

docs/consuming-kafka-topics-using-zio-streams.md Outdated Show resolved Hide resolved
client <- randomClient

keepProducing <- Ref.make(true)
_ <- produceOne(topic, "key", "value").repeatWhileZIO(_ => keepProducing.get).fork
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also use test helper method scheduledProduce.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need more time to digest this.

@svroonland
Copy link
Collaborator Author

svroonland commented Apr 3, 2024

Hmm, should we instead of this:

Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) { 
  stream => ... 
}

offer this:

Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) {
  (stream, _) => stream.flatMapPar(...) 
}

The second parameter would be the SubscriptionStreamControl, which you could always manually call stop on. Or would that prevent certain use cases.. 🤔

@erikvanoosten
Copy link
Collaborator

Hmm, should we instead of this:

If I understand it correctly, the proposal allows for more use cases; with it you can also call stop for any condition you want. Is it true that after stopping, you can start consuming again?

@svroonland
Copy link
Collaborator Author

Well, I mean compared to just the partitionedStreamWithControl method. In both cases you would need to do something with the stream that ultimately reduces to a ZIO of Any, so I don't think the partitionedStreamWithGracefulShutdown is limiting in that regard.

stop currently doesn't support that, since the stream would then be finished. We could probably build pause and resume like in #941.

@erikvanoosten
Copy link
Collaborator

If resume after stop is not supported (and never will be), then I like the first proposal better where you don't need to call stop. What would you do after calling stop?

@svroonland
Copy link
Collaborator Author

Well, in both proposals you can call stop.

I don't think you want to do anything after stop, but it would give you more explicit control when to stop, instead of when the scope ends.

We probably need to decide if we want to add pause/resume in the future. If we do, we should add the control parameter like in the partitionedStreamWithGracefulShutdown example for future compatibility. If we don't, we can drop it altogether and make SubscriptionStreamControl a purely internal concept (if at all).


override def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int
bufferSize: RuntimeFlags
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably an IDE mistake

Suggested change
bufferSize: RuntimeFlags
bufferSize: Int

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, happens to me all the time as well.

@guizmaii
Copy link
Member

guizmaii commented Apr 5, 2024

Hey :)

Thanks for the great work!

Here's some initial feedback:

I'm not a big fan of the SubscriptionStreamControl implementation.

To me, functions/methods returning it should return a Tuple (stream, control).
It avoids adding one more concept for our users to understand and learn (Kafka already has a lot of concepts)
It also simplifies the interface of the control type, the current one with the [S <: ZStream[_, _, _]] being complex
It also simplifies the return type of our functions/methods, avoiding this kind of type:

SubscriptionStreamControl[Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]]

in favor of:

(Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])], SubscriptionStreamControl)

Made the change in a PR to show/study how, to me, it simplifies things: https://github.com/zio/zio-kafka/pull/1207/files

} yield stream
} yield SubscriptionStreamControl(
ZStream.fromQueue(partitionAssignmentQueue),
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue
Copy link
Member

@guizmaii guizmaii Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue
withRunloopZIO(requireRunning = false)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue

Not sure you want to use true here. true means: if the runloop is not running, start it and apply the stopSubscribedTopicPartitions function.
In your case, IMO, if the Runloop isn't running, calling the control.stop should be a no-op, which will be the case if you use false instead

Also, do we want to execute the partitionAssignmentQueue.offer(Take.end).ignore code if the Runloop isn't running? If not, then something like this would be more appropriate:

      withRunloopZIO(requireRunning = false) { runloop =>
        runloop.stopSubscribedTopicPartitions(subscription) *>
          partitionAssignmentQueue.offer(Take.end).ignore
      }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your case, IMO, if the Runloop isn't running, calling the control.stop should be a no-op

We need to stop the subscription also when runloop isn't running. Right?

@guizmaii
Copy link
Member

guizmaii commented Apr 5, 2024

Didn't finish my review yet. I still have some parts of the code to explore/understand, but I have to go. I'll finish it later 🙂

@svroonland
Copy link
Collaborator Author

Thanks for the feedback Jules. Agreed about the extra concept that would be unwanted. Check out my latest interface proposal where there is only a plainStreamWithGracefulShutdown method and SubscriptionStreamControl remains hidden.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reading the code...

Comment on lines 11 to 12
* @tparam S
* Type of the stream returned from [[stream]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, thinking about more, the trait doesn't really care what S is, or that it is even a stream at all. That means another abstraction might be hidden ('Stoppable'?). Abstracting further is hard though; the definition of stop is pretty specific.

I also noticed that the stop method is defined in terms of the consumer and is not related to the stream. Should that be the case? Shouldn't this stop only relate to the referred stream?

I am trying to weigh this form against @guizmaii 's proposal in #1207. I am no longer certain which one I like more.


override def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int
bufferSize: RuntimeFlags
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, happens to me all the time as well.

@@ -70,6 +70,22 @@ trait Consumer {
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]

/**
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown.

@@ -93,6 +109,22 @@ trait Consumer {
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]

/**
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown.

Comment on lines +154 to +156
/**
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method might be the most attractive way to use zio-kafka, lets extend the documentation a bit.

e.g.

Suggested change
/**
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown
*/
/**
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown. During a graceful shutdown the consumer is stopped but the stream can complete processing and commit already fetched records.
*
* Example usage:
* {{{
* ... todo ...
* }}}
*/

We could also include a reference to the documentation (though I am always extremely happy when the scaladocs are all you need, e.g. scalatest documentation is my benchmark).

@@ -75,6 +75,13 @@ private[consumer] final class Runloop private (
private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] =
commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit

private[internal] def stopSubscribedTopicPartitions(subscription: Subscription): UIO[Unit] =
Copy link
Collaborator

@erikvanoosten erikvanoosten Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is stopping (actually ending) streams, not subscriptions. WDYT of:

Suggested change
private[internal] def stopSubscribedTopicPartitions(subscription: Subscription): UIO[Unit] =
private[internal] def endStreams(subscription: Subscription): UIO[Unit] =

And similarly rename RunloopCommand.StopSubscribedTopicPartitions to RunloopCommand.EndStreamsBySubscription

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Apr 7, 2024

I understand now that when graceful shutdown starts we're ending the subscribed streams. That should work nicely. Lets work out what will happen next to the runloop. The runloop would still be happily fetching records for that stream. When those are offered to the stream, PartitionStreamControl.offerRecords will probably append those records to the queue (even though it now also contains an 'end' token). Because of the 'end' token that is already in that queue, these new records will never be taken out. Back pressure will kick in (depending on the fetch strategy) and the partitions will be paused. Once we're unsubscribed, 15 seconds later, the queue will be garbage collected. So far so good.

We can do slightly better though. We're fetching and storing all these records in the queue for nothing, even potentially causing an OOM for systems that are tuned for the case where processing happens almost immediately.

My proposal is to:

  1. stop accepting more records in PartitionStreamControl.offerRecords when the queue was ended
  2. in Runloop.handlePoll only pass running streams to fetchStrategy.selectPartitionsToFetch so that partitions for ended streams are immediately paused

If you want, I can extend this PR with that proposal (or create a separate PR).

@svroonland
Copy link
Collaborator Author

@erikvanoosten If you have some time to implement those two things, by all means.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Apr 13, 2024

@erikvanoosten If you have some time to implement those two things, by all means.

@svroonland Done in commit 1218204.

Now I am wondering, how can we test this?

@svroonland
Copy link
Collaborator Author

svroonland commented Apr 14, 2024

Change looks good. Totally forgot to implement this part.

@erikvanoosten
Copy link
Collaborator

Depends on zio/zio#8804.

@svroonland svroonland marked this pull request as draft May 23, 2024 05:32
svroonland and others added 11 commits June 5, 2024 12:48
By using ZIO.async, we no longer need a reference to the zio runtime,
nor do we need the `exec` trickery anymore.
Also: fix typo and make metric descriptions consistent.
When many hundreds of partitions need to be consumed, an excessive
amount of heap can be used for pre-fetching. The
`ManyPartitionsQueueSizeBasedFetchStrategy` works similarly as the
default `QueueSizeBasedFetchStrategy` but limits total memory usage.
Refactoring of the producer so that it handles errors per record.
Zio-kafka applications always pre-fetch data so that user streams can
process the data asynchronously. This is not compatible with auto
commit. When auto commit is enabled, the consumer will automatically
commit batches _before_ they are processed by the user streams.

An unaware user might accidentally enable auto commit and lose data
during rebalances.

Solves #1289.
## About this PR
📦 Updates
[org.scalameta:scalafmt-core](https://github.com/scalameta/scalafmt)
from `3.8.2` to `3.8.3`

📜 [GitHub Release
Notes](https://github.com/scalameta/scalafmt/releases/tag/v3.8.3) -
[Version
Diff](scalameta/scalafmt@v3.8.2...v3.8.3)

## Usage
✅ **Please merge!**

I'll automatically update this PR to resolve conflicts as long as you
don't change it yourself.

If you'd like to skip this version, you can just close this PR. If you
have any feedback, just mention me in the comments below.

Configure Scala Steward for your repository with a
[`.scala-steward.conf`](https://github.com/scala-steward-org/scala-steward/blob/767fcfecbfd53c507152f6cf15c846176bae561d/docs/repo-specific-configuration.md)
file.

_Have a fantastic day writing Scala!_

<details>
<summary>⚙ Adjust future updates</summary>

Add this to your `.scala-steward.conf` file to ignore future updates of
this dependency:
```
updates.ignore = [ { groupId = "org.scalameta", artifactId = "scalafmt-core" } ]
```
Or, add this to slow down future updates of this dependency:
```
dependencyOverrides = [{
  pullRequests = { frequency = "30 days" },
  dependency = { groupId = "org.scalameta", artifactId = "scalafmt-core" }
}]
```
</details>

<sup>
labels: library-update, early-semver-patch, semver-spec-patch,
commit-count:1
</sup>

Co-authored-by: zio-scala-steward[bot] <145262613+zio-scala-steward[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants