diff --git a/docs/consuming-kafka-topics-using-zio-streams.md b/docs/consuming-kafka-topics-using-zio-streams.md index dcdeb87c2..f6fa93907 100644 --- a/docs/consuming-kafka-topics-using-zio-streams.md +++ b/docs/consuming-kafka-topics-using-zio-streams.md @@ -64,3 +64,31 @@ Consumer.partitionedStream(Subscription.topics("topic150"), Serde.string, Serde. ``` When using partitionedStream with `flatMapPar(n)`, it is recommended to set n to `Int.MaxValue`. N must be equal or greater than the number of partitions your consumer subscribes to otherwise there'll be unhandled partitions and Kafka will eventually evict your consumer. + +## Controlled shutdown + +The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, some records may be 'in-flight', e.g. being processed by one of the stages of your consumer stream user code. Those records will be processed partly and their offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine. + +zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed. + +Use the `with*Stream` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted. + +```scala +import zio.Console.printLine +import zio.kafka.consumer._ + +Consumer.withPartitionedStream( + Subscription.topics("topic150"), + Serde.string, + Serde.string +) { stream => + stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) => + partitionStream + .tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) + .map(_.offset) + } + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) + .runDrain +} +``` \ No newline at end of file diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4fae127a0..3d66d7815 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -355,6 +355,203 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) } yield assertTrue(offset.map(_.offset).contains(9L)) } @@ TestAspect.nonFlaky(2), + suite("streamWithGracefulShutdown")( + test("runWithGracefulShutdown must end streams while still processing commits") { + val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) + for { + group <- randomGroup + client <- randomClient + topic <- randomTopic + _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = 5)) + processedMessageOffsets <- Ref.make(Chunk.empty[(TopicPartition, Long)]) + results <- ZIO.scoped { + for { + stop <- Promise.make[Nothing, Unit] + fib <- + Consumer + .withPartitionedStream[Any, String, String]( + Subscription.topics(topic), + Serde.string, + Serde.string + ) { stream => + stream + .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => + partitionStream.mapConcatZIO { record => + for { + nr <- processedMessageOffsets + .updateAndGet( + _ :+ (tp -> record.offset.offset) + ) + .map(_.size) + _ <- stop.succeed(()).when(nr == 10) + } yield Seq(record.offset) + } + } + .transduce(Consumer.offsetBatches) + .mapZIO(batch => + ZIO.logDebug("Starting batch commit") *> batch.commit + .tapErrorCause( + ZIO.logErrorCause( + s"Error doing commit of batch ${batch.offsets}", + _ + ) + ) *> ZIO.logDebug("Commit done") + ) + .runDrain + } + .forkScoped + _ <- produceMany(topic, kvs) + _ <- scheduledProduce( + topic, + Schedule.fixed(500.millis).jittered + ).runDrain.forkScoped + _ <- stop.await *> fib.interrupt + processedOffsets <- processedMessageOffsets.get + latestProcessedOffsets = + processedOffsets.groupBy(_._1).map { case (tp, values) => + tp -> values.map(_._2).maxOption.getOrElse(0L) + } + tps = processedOffsets.map { case (tp, _) => tp }.toSet + committedOffsets <- Consumer.committed(tps) + } yield (latestProcessedOffsets, committedOffsets) + }.provideSomeLayer[Kafka with Producer]( + consumer( + client, + Some(group), + commitTimeout = 2.seconds // Detect issues with commits earlier + ) + ) + (processedOffsets, committedOffsets) = results + } yield assertTrue(processedOffsets.forall { case (tp, offset) => + committedOffsets.get(tp).flatMap(_.map(_.offset())).contains(offset + 1) + }) + } @@ nonFlaky(10), + test( + "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" + ) { + // NOTE: + // When this test fails with the message `100000 was not less than 100000`, it's because + // your computer is so fast that the first consumer already consumed all 100000 messages. + val numberOfMessages: Int = 100000 + val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceMany(topic, kvs) + // Starting a consumption session to start the Runloop + consumed0 <- ZIO.scoped { + for { + consumed <- Ref.make(0L) + fiber <- + consumer + .withPlainStream( + Subscription.manual(topic -> 0), + Serde.string, + Serde.string + ) { stream => + stream + .tap(_ => consumed.update(_ + 1)) + .runDrain + } + .forkScoped + _ <- ZIO.sleep(200.millis) + _ <- fiber.interrupt + consumed0 <- consumed.get + _ <- ZIO.logDebug(s"consumed0: $consumed0") + } yield consumed0 + } + + _ <- ZIO.logDebug("About to sleep 5 seconds") + _ <- ZIO.sleep(5.seconds) + _ <- ZIO.logDebug("Slept 5 seconds") + consumed1 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + } yield assert(consumed0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) && + assert(consumed1)(equalTo(numberOfMessages.toLong)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + }, + test("can stop one stream while keeping another one running") { + val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i")) + for { + topic1 <- randomTopic + topic2 <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic1, kvs) + _ <- produceMany(topic2, kvs) + _ <- ZIO.scoped { + for { + stream1Started <- Promise.make[Nothing, Unit] + stream1Done <- Promise.make[Nothing, Unit] + stream1Interrupted <- Promise.make[Nothing, Unit] + stream1Fib <- ZIO.logAnnotate("stream", "1") { + (Consumer + .withPlainStream( + Subscription.topics(topic1), + Serde.string, + Serde.string + ) { stream => + stream + .tap(_ => stream1Started.succeed(())) + .zipWithIndex + .map(_._2) + .runDrain + } + .tapErrorCause(c => ZIO.logErrorCause("Stream 1 failed", c)) + .ensuring(stream1Done.succeed(()))) + .forkScoped + } + _ <- stream1Started.await + _ <- ZIO.logAnnotate("stream", "2") { + Consumer + .withPlainStream( + Subscription.topics(topic2), + Serde.string, + Serde.string + ) { stream => + stream.zipWithIndex + .map(_._2) + .tap(count => + (stream1Fib.interrupt <* stream1Interrupted.succeed(())).when(count == 4) + ) + .runDrain + } + .tapErrorCause(c => ZIO.logErrorCause("Stream 2 failed", c)) + .forkScoped + } + _ <- stream1Interrupted.await + _ <- produceMany(topic1, kvs) + _ <- stream1Done.await + .tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c)) + } yield () + }.provideSomeLayer[Kafka with Scope with Producer](consumer(client, Some(group))) + } yield assertCompletes + } @@ nonFlaky(10) + ), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: // - Set the max poll interval very low: a couple of seconds. @@ -1557,6 +1754,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSome[Scope & Kafka](producer) .provideSomeShared[Scope]( Kafka.embedded - ) @@ withLiveClock @@ timeout(2.minutes) + ) @@ withLiveClock @@ timeout(10.minutes) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 776e403e8..297341223 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -70,6 +70,26 @@ trait Consumer { valueDeserializer: Deserializer[R, V] ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + /** + * Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown. + * + * When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by + * `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight + * messages. + */ + def withPartitionedAssignmentStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R, Throwable, Any] + /** * Create a stream with messages on the subscribed topic-partitions by topic-partition * @@ -93,6 +113,26 @@ trait Consumer { valueDeserializer: Deserializer[R, V] ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + /** + * Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown. + * + * When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by + * `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight + * messages. + */ + def withPartitionedStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R, Throwable, Any] + /** * Create a stream with all messages on the subscribed topic-partitions * @@ -111,6 +151,7 @@ trait Consumer { * On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer * subscription is changed to exclude this subscription. */ + def plainStream[R, K, V]( subscription: Subscription, keyDeserializer: Deserializer[R, K], @@ -118,6 +159,23 @@ trait Consumer { bufferSize: Int = 4 ): ZStream[R, Throwable, CommittableRecord[K, V]] + /** + * Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown + * + * When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by + * `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight + * messages. + */ + def withPlainStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + bufferSize: Int = 4, + shutdownTimeout: Duration = 15.seconds + )( + withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any] + ): ZIO[R, Throwable, Any] + /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit * requests. @@ -261,6 +319,30 @@ object Consumer { ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) + /** + * Accessor method + */ + def withPartitionedAssignmentStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R with Consumer, Throwable, Any] = + ZIO.serviceWithZIO[Consumer]( + _.withPartitionedAssignmentStream( + subscription, + keyDeserializer, + valueDeserializer, + shutdownTimeout + )(withStream) + ) + /** * Accessor method */ @@ -275,6 +357,27 @@ object Consumer { ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) + /** + * Accessor method + */ + def withPartitionedStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R with Consumer, Throwable, Any] = + ZIO.serviceWithZIO[Consumer]( + _.withPartitionedStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout)( + withStream + ) + ) + /** * Accessor method */ @@ -288,6 +391,24 @@ object Consumer { _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) + /** + * Accessor method + */ + def withPlainStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + bufferSize: Int = 4, + shutdownTimeout: Duration = 15.seconds + )( + withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any] + ): ZIO[R with Consumer, Throwable, Any] = + ZIO.serviceWithZIO[Consumer]( + _.withPlainStream(subscription, keyDeserializer, valueDeserializer, bufferSize, shutdownTimeout)( + withStream + ) + ) + /** * Accessor method */ @@ -474,7 +595,8 @@ private[consumer] final class ConsumerLive private[consumer] ( ZStream.unwrapScoped { for { - stream <- runloopAccess.subscribe(subscription) + streamControl <- runloopAccess.subscribe(subscription) + stream = streamControl.stream } yield stream .map(_.exit) .flattenExitOption @@ -492,24 +614,163 @@ private[consumer] final class ConsumerLive private[consumer] ( } } + private def partitionedAssignmentStreamWithControl[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V] + ): ZIO[Scope, Throwable, SubscriptionStreamControl[ + Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + ]] = { + val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) + for { + streamControl <- runloopAccess.subscribe(subscription) + } yield SubscriptionStreamControl( + streamControl.stream + .map(_.exit) + .flattenExitOption + .map { + _.collect { + case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => + val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + if (onlyByteArraySerdes) + partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] + else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) + + tp -> stream + } + }, + streamControl.stop + ) + } + + override def withPartitionedAssignmentStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R, Throwable, Any] = runWithGracefulShutdown[Stream[Throwable, Chunk[ + (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + ]], R, Throwable]( + partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer), + shutdownTimeout + )( + withStream + ) + override def partitionedStream[R, K, V]( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + ): Stream[ + Throwable, + ( + TopicPartition, + ZStream[R, Throwable, CommittableRecord[K, V]] + ) + ] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks + override def withPartitionedStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + shutdownTimeout: Duration = 15.seconds + )( + withStream: Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[ + R, + Throwable, + Any + ] + ): ZIO[R, Throwable, Any] = + withPartitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { stream => + withStream(stream.flattenChunks) + } + override def plainStream[R, K, V]( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], - bufferSize: Int + bufferSize: Int = 4 ): ZStream[R, Throwable, CommittableRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize )(_._2) + override def withPlainStream[R, K, V]( + subscription: Subscription, + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V], + bufferSize: Int = 4, + shutdownTimeout: Duration = 15.seconds + )( + withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any] + ): ZIO[R, Throwable, Any] = + withPartitionedStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { partitionedStream => + withStream( + partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2) + ) + } + + /** + * Takes a SubscriptionStreamControl for some stream and runs the given ZIO workflow on that stream such that, when + * interrupted, stops fetching records and gracefully waits for the ZIO workflow to complete. + * + * @param streamControl + * Result of `withPlainStream`, `withPartitionedStream` or `withPartitionedAssignmentStream` + * @param shutdownTimeout + * Timeout for the workflow to complete after initiating the graceful shutdown + * @param withStream + * Takes the stream as input and returns a ZIO workflow that processes the stream. As in most programs the given + * workflow runs until an external interruption, the result value (Any type) is meaningless. `withStream` is + * typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)` + */ + private def runWithGracefulShutdown[StreamType <: ZStream[_, _, _], R, E]( + streamControl: ZIO[Scope, E, SubscriptionStreamControl[StreamType]], + shutdownTimeout: Duration + )( + withStream: StreamType => ZIO[R, E, Any] + ): ZIO[R, E, Any] = + ZIO.scoped[R] { + for { + control <- streamControl + fib <- + withStream(control.stream) + .onInterrupt( + ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen") + ) + .tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause)) + .forkScoped + result <- + fib.join.onInterrupt( + control.stop *> + fib.join + .timeout(shutdownTimeout) + .someOrElseZIO( + ZIO.logError( + "Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed." + ) + ) + .tapErrorCause(cause => + ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) + ) + // Workaround for https://github.com/zio/zio/issues/9288 + .forkDaemon + .flatMap(_.join) + .tapErrorCause(cause => + ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) + ) + .ignore + ) + } yield result + } + override def stopConsumption: UIO[Unit] = ZIO.logDebug("stopConsumption called") *> runloopAccess.stopConsumption diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala new file mode 100644 index 000000000..1e041cdef --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala @@ -0,0 +1,15 @@ +package zio.kafka.consumer +import zio.UIO +import zio.stream.ZStream + +/** + * Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue + * to be processed and their offsets committed. + * + * @param stream + * The stream of partitions / records for this subscription + * @param stop + * Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to + * proceed (consumer remains subscribed) + */ +final private[consumer] case class SubscriptionStreamControl[S <: ZStream[_, _, _]](stream: S, stop: UIO[Unit]) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 85a83eb19..84e438c8b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -52,16 +52,20 @@ final class PartitionStreamControl private ( /** Offer new data for the stream to process. Should be called on every poll, also when `data.isEmpty` */ private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = - if (data.isEmpty) { - queueInfoRef.update(_.withEmptyPoll) - } else { - for { - now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos - _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) - _ <- dataQueue.offer(Take.chunk(data)) - } yield () - } + ZIO + .whenZIO(isRunning) { + if (data.isEmpty) { + queueInfoRef.update(_.withEmptyPoll) + } else { + for { + now <- Clock.nanoTime + newPullDeadline = now + maxPollIntervalNanos + _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) + _ <- dataQueue.offer(Take.chunk(data)) + } yield () + } + } + .unit def queueSize: UIO[Int] = queueInfoRef.get.map(_.size) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 35cf60fa5..54b8486bc 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -56,8 +56,7 @@ private[consumer] final class Runloop private ( .offerAll( Chunk( RunloopCommand.RemoveAllSubscriptions, - RunloopCommand.StopAllStreams, - RunloopCommand.StopRunloop + RunloopCommand.StopAllStreams ) ) .unit @@ -72,8 +71,19 @@ private[consumer] final class Runloop private ( _ <- ZIO.logDebug(s"Done for subscription $subscription") } yield () - private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] = - commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit + private[internal] def removeSubscription(subscription: Subscription): Task[Unit] = + for { + promise <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.RemoveSubscription(subscription, promise)) + _ <- promise.await + } yield () + + private[internal] def endStreamsBySubscription(subscription: Subscription): UIO[Unit] = + for { + promise <- Promise.make[Nothing, Unit] + _ <- commandQueue.offer(RunloopCommand.EndStreamsBySubscription(subscription, promise)).unit + _ <- promise.await + } yield () private def makeRebalanceListener: ConsumerRebalanceListener = { // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This @@ -463,16 +473,19 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = { for { - partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + runningStreamsBeforePoll <- ZIO.filter(state.assignedStreams)(_.isRunning) + partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(runningStreamsBeforePoll) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + s" ${state.pendingCommits.size} pending commits," + - s" resuming $partitionsToFetch partitions" + s" resuming ${partitionsToFetch.size} out of ${state.assignedStreams.size} partitions" ) _ <- currentStateRef.set(state) pollResult <- consumer.runloopAccess { c => for { + assignment <- ZIO.attempt(c.assignment()) + _ <- verifyAssignedStreamsMatchesAssignment(state.assignedStreams, assignment.asScala.toSet) resumeAndPauseCounts <- resumeAndPausePartitions(c, partitionsToFetch) (toResumeCount, toPauseCount) = resumeAndPauseCounts @@ -562,16 +575,7 @@ private[consumer] final class Runloop private ( ) ) // Ensure that all assigned partitions have a stream and no streams are present for unassigned streams - _ <- - ZIO - .logWarning( - s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" - ) - .when( - currentAssigned != updatedAssignedStreams - .map(_.tp) - .toSet || currentAssigned.size != updatedAssignedStreams.size - ) + _ <- verifyAssignedStreamsMatchesAssignment(updatedAssignedStreams, currentAssigned) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, @@ -596,6 +600,16 @@ private[consumer] final class Runloop private ( ) } + private def verifyAssignedStreamsMatchesAssignment( + assignedStreams: Chunk[PartitionStreamControl], + currentAssigned: Set[TopicPartition] + ) = + ZIO + .logWarning( + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${assignedStreams.map(_.tp).mkString(",")}" + ) + .when(currentAssigned != assignedStreams.map(_.tp).toSet || currentAssigned.size != assignedStreams.size) + /** * Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded * its poll interval, shutdown the consumer. @@ -606,9 +620,18 @@ private[consumer] final class Runloop private ( anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream .maxPollIntervalExceeded(now) - .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) + .tap { exceeded => + (ZIO + .logWarning( + s"Stream for ${stream.tp} has not pulled from upstream during the max poll interval" + ) *> stream.halt) + .when(exceeded) + } .map(acc || _) } + _ <- ZIO + .logWarning("Shutting down Runloop because one or more streams exceeded the max poll interval") + .when(anyExceeded) _ <- shutdown.when(anyExceeded) } yield () @@ -661,8 +684,20 @@ private[consumer] final class Runloop private ( cmd.succeed *> doChangeSubscription(newSubState) } } - case RunloopCommand.RemoveSubscription(subscription) => - state.subscriptionState match { + case RunloopCommand.EndStreamsBySubscription(subscription, cont) => + ZIO.foreachDiscard( + state.assignedStreams.filter(stream => Subscription.subscriptionMatches(subscription, stream.tp)) + )(_.end) *> cont + .succeed(()) + .as( + state.copy( + pendingRequests = + state.pendingRequests.filterNot(req => Subscription.subscriptionMatches(subscription, req.tp)) + ) + ) + + case RunloopCommand.RemoveSubscription(subscription, cont) => + (state.subscriptionState match { case SubscriptionState.NotSubscribed => ZIO.succeed(state) case SubscriptionState.Subscribed(existingSubscriptions, _) => val newUnion: Option[(Subscription, NonEmptyChunk[Subscription])] = @@ -679,9 +714,11 @@ private[consumer] final class Runloop private ( ZIO.logDebug(s"Unsubscribing kafka consumer") *> doChangeSubscription(SubscriptionState.NotSubscribed) } - } + }).tapBoth(cont.fail, _ => cont.succeed(()).unit) case RunloopCommand.RemoveAllSubscriptions => doChangeSubscription(SubscriptionState.NotSubscribed) - case RunloopCommand.StopAllStreams => + case RunloopCommand.StopAllStreams => + // End all partition streams and any pending requests. Keep the runloop running so that commits for in-flight + // records can still be processed. Keep assigned streams as is to be consistent with consumer assignment for { _ <- ZIO.logDebug("Stop all streams initiated") _ <- ZIO.foreachDiscard(state.assignedStreams)(_.end) @@ -910,6 +947,7 @@ object Runloop { _ <- ZIO.addFinalizer( ZIO.logDebug("Shutting down Runloop") *> runloop.shutdown *> + commandQueue.offer(RunloopCommand.StopRunloop) *> waitForRunloopStop <* ZIO.logDebug("Shut down Runloop") ) @@ -918,6 +956,13 @@ object Runloop { private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], pendingCommits: Chunk[Runloop.Commit], + + /* + * Streams for partitions that are currently assigned to this consumer. + * + * Before and after each rebalance, it should be consistent with the partitions that are assigned according to the + * underlying KafkaConsumer. + */ assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6c90d2d1c..8269bdf4f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription, SubscriptionStreamControl } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ @@ -52,19 +52,26 @@ private[consumer] final class RunloopAccess private ( * there's no mistake possible for the caller. * * The external world (Consumer) doesn't need to know how we "subscribe", "unsubscribe", etc. internally. + * + * @returns + * A SubscriptionStreamControl which allows graceful shutdown of all streams created from this subscription */ def subscribe( subscription: Subscription - ): ZIO[Scope, InvalidSubscriptionUnion, UStream[Take[Throwable, PartitionAssignment]]] = + ): ZIO[Scope, InvalidSubscriptionUnion, SubscriptionStreamControl[UStream[Take[Throwable, PartitionAssignment]]]] = for { + end <- Promise.make[Nothing, Unit] stream <- ZStream.fromHubScoped(partitionHub) // starts the Runloop if not already started _ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription)) _ <- ZIO.addFinalizer { - withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <* + withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription).orDie) <* diagnostics.emit(Finalization.SubscriptionFinalized) } - } yield stream + } yield SubscriptionStreamControl( + stream = stream.merge(ZStream.fromZIO(end.await).as(Take.end)), + stop = withRunloopZIO(requireRunning = false)(_.endStreamsBySubscription(subscription)) *> end.succeed(()).ignore + ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index be43f585c..18366fe7a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -30,6 +30,11 @@ object RunloopCommand { @inline def succeed: UIO[Unit] = cont.succeed(()).unit @inline def fail(e: InvalidSubscriptionUnion): UIO[Unit] = cont.fail(e).unit } - final case class RemoveSubscription(subscription: Subscription) extends StreamCommand - case object RemoveAllSubscriptions extends StreamCommand + final case class RemoveSubscription(subscription: Subscription, cont: Promise[Throwable, Unit]) extends StreamCommand + + final case class EndStreamsBySubscription(subscription: Subscription, cont: Promise[Nothing, Unit]) + extends StreamCommand { + @inline def succeed: UIO[Unit] = cont.succeed(()).unit + } + case object RemoveAllSubscriptions extends StreamCommand }