Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of a stream for a single subscription #1201

Open
wants to merge 87 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
7a9a6f8
Draft of interface changes
svroonland Mar 24, 2024
c8196b3
Remove deprecated for now
svroonland Mar 24, 2024
583d112
Draft implementation
svroonland Mar 24, 2024
f55c898
Fix implementation + first test
svroonland Mar 24, 2024
6f86958
More tests copied from stopConsumption
svroonland Mar 27, 2024
9eb989b
Alternative interface, workaround inability to deconstruct tuples in …
svroonland Mar 28, 2024
9a32c74
Formatting
svroonland Mar 28, 2024
98df970
Fix doc
svroonland Mar 28, 2024
29f8e44
Add test
svroonland Mar 28, 2024
1215c43
Tweak docs
svroonland Mar 28, 2024
97d7e6f
Add test
svroonland Mar 28, 2024
336aa8d
Move to separate file
svroonland Mar 29, 2024
361cfec
runWithGracefulShutdown
svroonland Mar 30, 2024
4ee9696
Add timeout
svroonland Mar 30, 2024
dfa0afa
Process PR comments
svroonland Apr 1, 2024
15ec438
Fix type constraints
svroonland Apr 1, 2024
885d9c9
Only offer *streamWithGracefulShutdown methods
svroonland Apr 3, 2024
1408148
Pause a partition when its stream is ended
erikvanoosten Apr 13, 2024
ed326a2
More tests
svroonland Apr 14, 2024
252cc3a
Add default value for bufferSize consistently
svroonland Apr 14, 2024
9954dda
Fix race condition between join and timeout, leading to unwanted inte…
svroonland Apr 14, 2024
add21e9
Fix test
svroonland Apr 14, 2024
8ec18c0
Make SubscriptionStreamControl a case class
svroonland Apr 14, 2024
c409368
Update doc
svroonland Apr 14, 2024
2dbddfc
Cleanup
svroonland Apr 14, 2024
7838929
Simplify subscribe
svroonland Apr 15, 2024
c9e48ab
requireRunning false
svroonland Apr 15, 2024
5d334e0
Log unexpected interruption
svroonland Apr 20, 2024
60464b6
Log more
svroonland Apr 20, 2024
258168d
Use partitionedStream
svroonland Apr 21, 2024
6e25cd4
Update pendingRequests and assignedStreams
svroonland Apr 21, 2024
8d19e16
Formatting
erikvanoosten Apr 27, 2024
f1edcab
Fix linting
erikvanoosten Apr 27, 2024
f19417b
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland May 10, 2024
5365aa0
Merge branch 'master' into subscription-stream-control
svroonland May 11, 2024
f977001
Merge branch 'master' into subscription-stream-control
svroonland May 19, 2024
c54b3e9
This works
svroonland May 20, 2024
33a6d82
This works with timeout
svroonland May 20, 2024
a155267
Remove unused annotation
svroonland May 20, 2024
15e041f
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 5, 2024
7c21f82
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 16, 2024
f5e42c5
Merge branch 'master' into subscription-stream-control
svroonland Jul 14, 2024
9a31569
Small improvements to the Producer (#1272)
erikvanoosten Jul 14, 2024
27f033e
Document metrics and consumer tuning based on metrics (#1280)
erikvanoosten Jul 14, 2024
108b285
Add alternative fetch strategy for many partitions (#1281)
erikvanoosten Jul 16, 2024
eaae8af
Alternative producer implementation (#1285)
erikvanoosten Jul 18, 2024
c862686
Prevent users from enabling auto commit (#1290)
erikvanoosten Jul 24, 2024
ff4ea7f
Update scalafmt-core to 3.8.3 (#1291)
zio-scala-steward[bot] Jul 26, 2024
bbbfe48
Upgrade to 2.1.7+11-854102ae-SNAPSHOT with ZStream finalization fix
svroonland Aug 10, 2024
a75a78e
Add sonatype snapshots
svroonland Aug 10, 2024
5fec195
Bump ZIO version
svroonland Oct 10, 2024
699e6e8
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 10, 2024
aafd4ec
Revert stuff
svroonland Oct 10, 2024
34f5110
Bump
svroonland Oct 20, 2024
95f9bef
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 20, 2024
f33dc34
Fix race condition when removing subscription
svroonland Nov 2, 2024
3cb1eee
Tweak
svroonland Nov 2, 2024
87eadd8
Tweak
svroonland Nov 2, 2024
31cd086
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
a6d2afa
Increase timeout
svroonland Nov 2, 2024
1835758
This seems to work
svroonland Nov 2, 2024
6c28b89
Cleanup
svroonland Nov 2, 2024
e649753
Cleanup
svroonland Nov 2, 2024
348b01e
Restore old methods so we can offer this as experimental functionality
svroonland Nov 2, 2024
c9f1597
Rename methods + add some doc
svroonland Nov 2, 2024
4a9b6f6
More renames
svroonland Nov 2, 2024
3c0cfd1
Fix rebalanceSafeCommits test timing out
svroonland Nov 2, 2024
90ca347
Forgot to commit this file
svroonland Nov 2, 2024
edb7005
Fix shutdown behavior
svroonland Nov 2, 2024
e400012
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
e83bc89
Apply suggestions from code review
svroonland Nov 2, 2024
efd3937
Restore stuff
svroonland Nov 2, 2024
e29d63e
Cleanup
svroonland Nov 2, 2024
f19f90d
Update docs/consuming-kafka-topics-using-zio-streams.md
svroonland Nov 3, 2024
c71a08f
PR comments
svroonland Nov 3, 2024
5c27da7
Do not empty commits when stopping all streams
svroonland Nov 5, 2024
9901b16
Revert change
svroonland Nov 5, 2024
a9925d5
Add assertion before poll, add some documenting comments
svroonland Nov 9, 2024
dac1865
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 9, 2024
ddbd576
Stronger test
svroonland Nov 9, 2024
8e461ee
Do not clear assignedStreams when ending streams by subscription (ins…
svroonland Nov 9, 2024
5848e9c
Fix interruption issue
svroonland Nov 9, 2024
143f914
Log timeout error + cleanup
svroonland Nov 9, 2024
d5fc7bb
Fix test compilation withFilter issue
svroonland Nov 9, 2024
d6f485c
Fix doc syntax
svroonland Nov 9, 2024
5ef97ef
Fix test
svroonland Nov 9, 2024
bec0e25
Update comment
svroonland Nov 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/consuming-kafka-topics-using-zio-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,31 @@ Consumer.partitionedStream(Subscription.topics("topic150"), Serde.string, Serde.
```

When using partitionedStream with `flatMapPar(n)`, it is recommended to set n to `Int.MaxValue`. N must be equal or greater than the number of partitions your consumer subscribes to otherwise there'll be unhandled partitions and Kafka will eventually evict your consumer.

## Controlled shutdown

The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, some records may be 'in-flight', e.g. being processed by one of the stages of your consumer stream user code. Those records will be processed partly and their offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine.

zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed.

Use the `with*Stream` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted.

```scala
import zio.Console.printLine
import zio.kafka.consumer._

Consumer.withPartitionedStream(
Subscription.topics("topic150"),
Serde.string,
Serde.string
) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,203 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
} yield assertTrue(offset.map(_.offset).contains(9L))
} @@ TestAspect.nonFlaky(2),
suite("streamWithGracefulShutdown")(
test("runWithGracefulShutdown must end streams while still processing commits") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
group <- randomGroup
client <- randomClient
topic <- randomTopic
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = 5))
processedMessageOffsets <- Ref.make(Chunk.empty[(TopicPartition, Long)])
results <- ZIO.scoped {
for {
stop <- Promise.make[Nothing, Unit]
fib <-
Consumer
.withPartitionedStream[Any, String, String](
Subscription.topics(topic),
Serde.string,
Serde.string
) { stream =>
stream
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
partitionStream.mapConcatZIO { record =>
for {
nr <- processedMessageOffsets
.updateAndGet(
_ :+ (tp -> record.offset.offset)
)
.map(_.size)
_ <- stop.succeed(()).when(nr == 10)
} yield Seq(record.offset)
}
}
.transduce(Consumer.offsetBatches)
.mapZIO(batch =>
ZIO.logDebug("Starting batch commit") *> batch.commit
.tapErrorCause(
ZIO.logErrorCause(
s"Error doing commit of batch ${batch.offsets}",
_
)
) *> ZIO.logDebug("Commit done")
)
.runDrain
}
.forkScoped
_ <- produceMany(topic, kvs)
_ <- scheduledProduce(
topic,
Schedule.fixed(500.millis).jittered
).runDrain.forkScoped
_ <- stop.await *> fib.interrupt
processedOffsets <- processedMessageOffsets.get
latestProcessedOffsets =
processedOffsets.groupBy(_._1).map { case (tp, values) =>
tp -> values.map(_._2).maxOption.getOrElse(0L)
}
tps = processedOffsets.map { case (tp, _) => tp }.toSet
committedOffsets <- Consumer.committed(tps)
} yield (latestProcessedOffsets, committedOffsets)
}.provideSomeLayer[Kafka with Producer](
consumer(
client,
Some(group),
commitTimeout = 2.seconds // Detect issues with commits earlier
)
)
(processedOffsets, committedOffsets) = results
} yield assertTrue(processedOffsets.forall { case (tp, offset) =>
committedOffsets.get(tp).flatMap(_.map(_.offset())).contains(offset + 1)
})
} @@ nonFlaky(10),
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
// NOTE:
// When this test fails with the message `100000 was not less than 100000`, it's because
// your computer is so fast that the first consumer already consumed all 100000 messages.
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop
consumed0 <- ZIO.scoped {
for {
consumed <- Ref.make(0L)
fiber <-
consumer
.withPlainStream(
Subscription.manual(topic -> 0),
Serde.string,
Serde.string
) { stream =>
stream
.tap(_ => consumed.update(_ + 1))
.runDrain
}
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- fiber.interrupt
consumed0 <- consumed.get
_ <- ZIO.logDebug(s"consumed0: $consumed0")
} yield consumed0
}

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
} yield assert(consumed0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) &&
assert(consumed1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
},
test("can stop one stream while keeping another one running") {
val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
for {
topic1 <- randomTopic
topic2 <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic1, kvs)
_ <- produceMany(topic2, kvs)
_ <- ZIO.scoped {
for {
stream1Started <- Promise.make[Nothing, Unit]
stream1Done <- Promise.make[Nothing, Unit]
stream1Interrupted <- Promise.make[Nothing, Unit]
stream1Fib <- ZIO.logAnnotate("stream", "1") {
(Consumer
.withPlainStream(
Subscription.topics(topic1),
Serde.string,
Serde.string
) { stream =>
stream
.tap(_ => stream1Started.succeed(()))
.zipWithIndex
.map(_._2)
.runDrain
}
.tapErrorCause(c => ZIO.logErrorCause("Stream 1 failed", c))
.ensuring(stream1Done.succeed(())))
.forkScoped
}
_ <- stream1Started.await
_ <- ZIO.logAnnotate("stream", "2") {
Consumer
.withPlainStream(
Subscription.topics(topic2),
Serde.string,
Serde.string
) { stream =>
stream.zipWithIndex
.map(_._2)
.tap(count =>
(stream1Fib.interrupt <* stream1Interrupted.succeed(())).when(count == 4)
)
.runDrain
}
.tapErrorCause(c => ZIO.logErrorCause("Stream 2 failed", c))
.forkScoped
}
_ <- stream1Interrupted.await
_ <- produceMany(topic1, kvs)
_ <- stream1Done.await
.tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c))
} yield ()
}.provideSomeLayer[Kafka with Scope with Producer](consumer(client, Some(group)))
} yield assertCompletes
} @@ nonFlaky(10)
),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
// - Set the max poll interval very low: a couple of seconds.
Expand Down Expand Up @@ -1557,6 +1754,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes)
) @@ withLiveClock @@ timeout(10.minutes)

}
Loading
Loading