Skip to content

Commit

Permalink
Update pendingRequests and assignedStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland authored and erikvanoosten committed Apr 27, 2024
1 parent 258168d commit 6e25cd4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Serde.string
) { stream =>
stream
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
.flatMapPar(Int.MaxValue) { case (_, partitionStream) =>
partitionStream.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
Expand Down
7 changes: 4 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,9 @@ private[consumer] final class ConsumerLive private[consumer] (
): ZIO[R, Throwable, Any] =
partitionedStreamWithGracefulShutdown(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) {
partitionedStream =>
withStream(partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2))
withStream(
partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2)
)
}

/**
Expand All @@ -724,8 +726,7 @@ private[consumer] final class ConsumerLive private[consumer] (
for {
control <- streamControl
fib <- withStream(control.stream)
.onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen"))
.forkDaemon
.onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")).forkScoped
result <-
fib.join.onInterrupt(
control.stop *> fib.join
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,16 @@ private[consumer] final class Runloop private (
case RunloopCommand.EndStreamsBySubscription(subscription, cont) =>
ZIO.foreachDiscard(
state.assignedStreams.filter(stream => Subscription.subscriptionMatches(subscription, stream.tp))
)(_.end) *> cont.succeed(()).as(state)
)(_.end) *> cont
.succeed(())
.as(
state.copy(
pendingRequests =
state.pendingRequests.filterNot(req => Subscription.subscriptionMatches(subscription, req.tp)),
assignedStreams =
state.assignedStreams.filterNot(stream => Subscription.subscriptionMatches(subscription, stream.tp))
)
)

case RunloopCommand.RemoveSubscription(subscription) =>
state.subscriptionState match {
Expand Down

0 comments on commit 6e25cd4

Please sign in to comment.