Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition lost not recovering, possible issue with RunLoop #1250

Closed
josdirksen opened this issue Jun 7, 2024 · 2 comments
Closed

Partition lost not recovering, possible issue with RunLoop #1250

josdirksen opened this issue Jun 7, 2024 · 2 comments

Comments

@josdirksen
Copy link

This might be related to the #1233 issue, but the last couple of weeks / months we see issues where after a partition is lost, it isn't recovering correctly. We've tried to analyze or debug it, but this occurs so infrequently that we haven't been able to isolate it.

By analyzing the code we might have identified the reason, but there is so much async stuff happening there, that we might be interpreting stuff wrongly.

What happens in our case is the following:

  1. Services are consuming from n partitions. Note that these are partitions for a topic that has intervals with little to no traffic.
  2. At a certain point, usually in the middle of the night, there is some network issue where kafka-client identifies the partitions as lost.
  3. We see these lost partitions in the logging, but partitions aren't being recovered, and we also see no errors in our own client code that the streams have failed.

For partitions that are revoked everything seems to be working correctly though.

What we see as possible cause for this is this. In the Runloop this happens for lost partitions:

      onLost = lostTps =>
        for {
          _              <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
          rebalanceEvent <- lastRebalanceEvent.get
          state          <- currentStateRef.get
          lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
          _ <- ZIO.foreachDiscard(lostStreams)(_.lost)
          _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps))
          _ <- ZIO.logTrace(s"onLost done")
        } yield ()

Resulting in this call in the PartitionStreamControl:

  private[internal] def lost: UIO[Boolean] = {
    val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
    interruptionPromise.fail(lostException)
  }

Looking at the way the interruptionPromise is handled this doesn't seem to work correctly when there are no records to be processed. In PartitionStreamControl we've got this repeating effect:

                 ZStream.repeatZIOChunk {
                   // First try to take all records that are available right now.
                   // When no data is available, request more data and await its arrival.
                   dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
                 }.flattenTake.chunksWith { s =>
                   s.tap(records => registerPull(queueInfo, records))
                     // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen.
                     .mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk))
                 }

And here the interruptionPromise is checked to see if we need to interrupt this effect. But, how would this work if there are no active chunks to process? The requestAndAawaitData function:

      requestAndAwaitData =
        for {
          _     <- commandQueue.offer(RunloopCommand.Request(tp))
          _     <- diagnostics.emit(DiagnosticEvent.Request(tp))
          taken <- dataQueue.takeBetween(1, Int.MaxValue)
        } yield taken

Blocks the current fiber until at least 1 element is taken. So when the lost function fails the promise, that promise is never checked, since there are no records coming in on the dataQueue (or I'm reading stuff wrong here, which is of course also possible).

For the revoke flow, the dataQueue gets an additional Take.end), to get out of the requestAndAwaitDatawait state. But that doesn't happen for thelost` scenario.

So, shouldn't the code for lost also make sure the dataQueue at least gets some value, since it seems to be stuck in the requestAndAwaitData loop indefinitely.

@josdirksen josdirksen changed the title Partition lost not recovering, possible issue with Partition lost not recovering, possible issue with RunLoop Jun 7, 2024
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jun 7, 2024

Thanks @josdirksen , that is some awesome spelunking there. I think you have found all the right places in the code, and also your analysis seems correct.

We can fix requestAndAwaitData by racing together with interruptionPromise.await:

      requestAndAwaitData =
        for {
          _     <- commandQueue.offer(RunloopCommand.Request(tp))
          _     <- diagnostics.emit(DiagnosticEvent.Request(tp))
          taken <- dataQueue
                     .takeBetween(1, Int.MaxValue)
                     .race(interruptionPromise.await)  // <-- added race here
        } yield taken

should do it.

However, I am beginning to wonder if we should fail the stream like this at all! It seems that lost partitions are more common then we thought. I was always working under the assumption that lost partitions are 'end of the world' type of situations, e.g. network splits, and network lost for a log time, where any processing that is still going on should be aborted ASAP.

Perhaps we should return to the situation we had before, where we treated a lost partition the same as a revoked partition. OR, we could treat it as a revoked partition when the internal queues are empty anyway... 🤔

erikvanoosten added a commit that referenced this issue Jun 7, 2024
When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis!)

Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests.

See #1250.
erikvanoosten added a commit that referenced this issue Jun 10, 2024
When a partition is lost while there is no traffic,
`PartitionStreamControl` is blocked waiting for data. We fix this by
racing with the `interruptPromise`. (Thanks @josdirksen for the
analysis! See #1250.)

Note: this situation can only occur with lost partitions. Timeouts (the
other reason for interrupts) do not occur when there is no traffic.
Currently, we have no way to test lost partitions. Therefore, there are
no added tests.

This PR does _not_ change how lost partitions are handled. That is, the
stream for the partition that is lost is interrupted, the other streams
are closed gracefully, the consumer aborts with an error.
erikvanoosten added a commit that referenced this issue Jun 10, 2024
Before 2.7.0 a lost partition was treated as a revoked partition. Since the partition is already assigned to another node, this potentially leads to duplicate processing of records.

Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads to an interrupt in the stream that handles the partition. The other streams are ended, and the consumer closes with an error. Usually, a full program restart is needed to resume consuming.

It should be noted that stream processing is not interrupted immediately. Only when the stream requests new records, the interrupt is observed. Unfortunately, we have not found a clean way to interrupt the stream consumer directly.

Meanwhile, from bug reports, we understand that partitions are usually lost when no records have been received for a long time.

In conclusion, 1) it is not possible to immediately interrupt user stream processing, and 2) it most likely not needed anyway because the stream is awaiting new records.

With this change, a lost partition no longer leads to an interrupt. Instead, we first drain the stream's internal queue (just to be sure, it is probably already empty), and then we end it gracefully (that is, without error). Other streams are not affected, the consumer will continue to work. When `rebalanceSafeCommits` is enabled, lost partitions do _not_ participate like revoked partitions do. So lost partitions cannot hold up a rebalance.

Fixes #1233 and #1250.
erikvanoosten added a commit that referenced this issue Jul 9, 2024
Before 2.7.0 a lost partition was treated as a revoked partition. Since
the partition is already assigned to another node, this potentially
leads to duplicate processing of records.

Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads
to an interrupt in the stream that handles the partition. The other
streams are ended, and the consumer closes with an error. Usually, a
full program restart is needed to resume consuming.

It should be noted that stream processing is not interrupted
immediately. Only when the stream requests new records, the interrupt is
observed. Unfortunately, we have not found a clean way to interrupt the
stream consumer directly.

Meanwhile, from bug reports (#1233, #1250), we understand that
partitions are usually lost when no records have been received for a
long time.

In conclusion, 1) it is not possible to immediately interrupt user
stream processing, and 2) it is most likely not needed anyway because
the stream is already done processing and awaiting more records.

With this change, a lost partition no longer leads to an interrupt.
Instead, we first drain the stream's internal queue (just to be sure, it
is probably already empty), and then we end the stream gracefully (that
is, without error, like we do with revoked partitions). Other streams
are not affected, the consumer will continue to work.

Lost partitions do not affect the features `rebalanceSafeCommits` and
`restartStreamsOnRebalancing`; they do _not_ hold up a rebalance waiting
for commits to complete, and they do _not_ lead to restarts of other
streams.

Clients that want to handle the partition lost event somehow, instead of
handling the failed stream they need to create their own `RebalanceListener`
and handle the `onLost` call.

Fixes #1233 and #1250.
svroonland added a commit that referenced this issue Oct 29, 2024
Fixes #1288. See also #1233 and #1250.

When all partitions are lost after some connection issue to the broker,
the streams for lost partitions are ended but polling stops, due to the
conditions in `Runloop.State#shouldPoll`. This PR fixes this by removing
the lost partition streams from the `assignedStreams` in the state,
thereby not disabling polling.

Also adds a warning that is logged whenever the assigned partitions
(according to the apache kafka consumer) are different from the assigned
streams, which helps to identify other issues or any future regressions
of this issue.

~Still needs a good test, the `MockConsumer` used in other tests
unfortunately does not allow simulating lost partitions, and the exact
behavior of the kafka client in this situation is hard to predict..~
Includes a test that fails when undoing the change to Runloop
@erikvanoosten
Copy link
Collaborator

@josdirksen Version 2.8.3 no longer fails onlost partitions and properly continues polling. I am going to close this issue. Please re-open or create a new one one if you still have problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants