Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer not reconnecting on lost session on proxy connectivity #1233

Open
fd8s0 opened this issue Apr 30, 2024 · 8 comments · Fixed by #1252
Open

Consumer not reconnecting on lost session on proxy connectivity #1233

fd8s0 opened this issue Apr 30, 2024 · 8 comments · Fixed by #1252

Comments

@fd8s0
Copy link

fd8s0 commented Apr 30, 2024

This behaviour doesn't seem to apply when not using a kafka proxy. Connecting directly we observe the stream always recomposes itself.

Example code:

import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.consumer.{Consumer, ConsumerSettings, RebalanceListener, Subscription}
import zio.kafka.serde.{Deserializer, Serializer}

object ZioKafkaProxyTest extends ZIOAppDefault {

  private val Props = Map(
    ConsumerConfig.GROUP_ID_CONFIG                 -> "zio-kafka-test",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG        -> "earliest"
  )

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
    for {
      consumer <- Consumer.make(
        ConsumerSettings(List("kafka-proxy:9192"))
          .withProperties(Props)
          .withPollTimeout(1.second)
          .withRebalanceListener(RebalanceListener(
            (assigned, _) => ZIO.logInfo(s"assigned: ${assigned.map(_.partition())}"),
            (revoked, _) => ZIO.logInfo(s"revoked: ${revoked.map(_.partition())}"),
            (lost, _) => ZIO.logInfo(s"lost: ${lost.map(_.partition())}"),
          ))
      )
      _ <-
        consumer.plainStream(Subscription.topics("topic"), Deserializer.string, Serializer.string, 2)
          .mapZIO(m => ZIO.logInfo(m.value))
          .runDrain
    } yield ExitCode.success
  }

}

In version 2.3.2 it goes something like this:

timestamp=2024-04-30T16:58:09.273167Z level=INFO thread=#zio-fiber-19 message="assigned: Set(0)" 
proxy stopped
wait for session timeout
proxy start
timestamp=2024-04-30T16:58:40.757980Z level=INFO thread=#zio-fiber-90 message="lost: Set(0)" 
timestamp=2024-04-30T16:58:43.769893Z level=INFO thread=#zio-fiber-97 message="assigned: Set(0)"
stream resumes

In version 2.7.4 it goes:

timestamp=2024-04-30T15:26:46.500919Z level=INFO thread=#zio-fiber-18 message="assigned: Set(0)"
stopped proxy
wait for session timeout
started proxy
timestamp=2024-04-30T15:29:32.603663Z level=INFO thread=#zio-fiber-22 message="lost: Set(0)" 
hangs indefinitely
when stopping:
2024-04-30 16:31:57 ERROR ConsumerCoordinator:1201 - [Consumer clientId=consumer-zio-kafka-test-1, groupId=zio-kafka-test] LeaveGroup request with Generation{generationId=-1, memberId='consumer-zio-kafka-test-1-e2669255-c590-4ac7-9457-a585dfbf3969', protocol='null'} failed with error: The coordinator is not aware of this member.
@erikvanoosten
Copy link
Collaborator

I suspect that this is since version 2.7.0, due to this change:

Always end streams in rebalance listener, support lost partitions #1089

What I do not understand is that it hangs indefinitely. If a partition is lost, the stream should be interrupted. This should immediately cause an error at the place where the stream is run.

@fd8s0
Copy link
Author

fd8s0 commented Apr 30, 2024

2.6 resumes the stream, so it must be 2.7+ issue

erikvanoosten added a commit that referenced this issue Jun 10, 2024
Before 2.7.0 a lost partition was treated as a revoked partition. Since the partition is already assigned to another node, this potentially leads to duplicate processing of records.

Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads to an interrupt in the stream that handles the partition. The other streams are ended, and the consumer closes with an error. Usually, a full program restart is needed to resume consuming.

It should be noted that stream processing is not interrupted immediately. Only when the stream requests new records, the interrupt is observed. Unfortunately, we have not found a clean way to interrupt the stream consumer directly.

Meanwhile, from bug reports, we understand that partitions are usually lost when no records have been received for a long time.

In conclusion, 1) it is not possible to immediately interrupt user stream processing, and 2) it most likely not needed anyway because the stream is awaiting new records.

With this change, a lost partition no longer leads to an interrupt. Instead, we first drain the stream's internal queue (just to be sure, it is probably already empty), and then we end it gracefully (that is, without error). Other streams are not affected, the consumer will continue to work. When `rebalanceSafeCommits` is enabled, lost partitions do _not_ participate like revoked partitions do. So lost partitions cannot hold up a rebalance.

Fixes #1233 and #1250.
@svroonland
Copy link
Collaborator

@fd8s0 Since you seem to be able to reproduce the issue quite well, could you confirm that zio-kafka 2.7.5. fixes this issue?

There are a few upgrades of kafka-clients (apache kafka) versions in between zio-kafka 2.3.2 and 2.7.4: from 3.4.1 to 3.7.0. My suspicion is that besides the issue fixed in #1252 there is also a bug in newer kafka-clients that causes the connection not to be recovered. If we could try zio-kafka 2.7.5 with an older apache-kafka client, we can rule that out.

Would you be willing to try to reproduce the issue with the older kafka-clients by adding this to your dependencies? I'm not 100% sure if you can just override the version like that, but let's try.

"org.apache.kafka" % "kafka-clients" % "3.4.1"

If we could pinpoint the kafka-clients version that introduces the issue, that would be even nicer.

@fd8s0
Copy link
Author

fd8s0 commented Jun 19, 2024

@svroonland seems still broken in 2.7.5 for me

I rolled back kafka-clients version by version, all the way back to 3.4.1 while having zio-kafka 2.7.5 and in no case it works like in zio-kafka 2.6.

If you're having trouble replicating the issue I can try to help. I don't share my exact setup because I'm relying on a zookeeper instance embedded inside hbase and it's a bit offtopic. I'm adding this on a docker compose with the kafka server

kafka-proxy:
    container_name: kafka-proxy
    image: grepplabs/kafka-proxy:0.3.8
    command: server --bootstrap-server-mapping "kafka:9092,kafka-proxy:9192"
    network_mode: "host"

Connect to 9192 port with the client, stop the proxy container for over a minute, then start it again.

@svroonland
Copy link
Collaborator

Thanks, I'm able to replicate this behavior now.

@svroonland
Copy link
Collaborator

@erikvanoosten Sure this was intended to be closed by 1252?

@erikvanoosten erikvanoosten reopened this Jul 10, 2024
@erikvanoosten
Copy link
Collaborator

@erikvanoosten Sure this was intended to be closed by 1252?

Yes!

@erikvanoosten
Copy link
Collaborator

Re-opened after discussion. We first want to see if #1252 really helped solve this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants