Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException in the release effect #1238

Closed
TobiasPfeifer opened this issue May 8, 2024 · 9 comments · Fixed by #1365
Closed

ConcurrentModificationException in the release effect #1238

TobiasPfeifer opened this issue May 8, 2024 · 9 comments · Fixed by #1365

Comments

@TobiasPfeifer
Copy link

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: zio-default-blocking-4, id: 164) otherThread(id: 59)
  		at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
  		at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2343)
  		at zio.kafka.consumer.internal.ConsumerAccess$.$anonfun$make$5(ConsumerAccess.scala:62)
  		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  		at zio.ZIOCompanionVersionSpecific.$anonfun$attempt$1(ZIOCompanionVersionSpecific.scala:100)
  		at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:62)
  		at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:61)

It seems that the KafkaConsumer must be closed from the same thread which is not guaranteed with ZIO.acquireRelease AFAIK

I´m providing the layers like this:

effect.provide(
          Kafka.embedded,
          KafkaTestUtils.producer,
          KafkaTestUtils.consumer(workerName)
)
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented May 15, 2024

Hi @TobiasPfeifer. Thanks for the bug report.
How often do you see this? Which zio version are you using? Which zio-kafka version?

The problem is that the consumer is still in use when the release effect (closing the underlying java consumer) is executed. My guess is that during shutdown, the task that is using the consumer is not interrupted.

There is some funky code in ConsumerAccess.withConsumerNoPermit that should manage the interruption. @svroonland do remember how this works? It reminds me of the non-deterministic shutdown we were looking at recently.

@svroonland
Copy link
Collaborator

It's not that funky, it just forks it so it can interrupt it by calling wakeup() ;)

I agree with your idea that something is still using the consumer at the time when close() is called. I would expect everything to be using ZIO scopes where necessary, so all resources should be released in a proper order. Makes me wonder if some forked fiber is not interrupted, perhaps in the user's code.

@TobiasPfeifer Could you share some code that generates this error?

@svroonland
Copy link
Collaborator

svroonland commented Nov 3, 2024

Maybe this is due to Runloop#shutdown enqueueing a series of commands but not awaiting them, in the finalizers defined in Runloop.make. Those commands include RemoveAllSubscriptions, which would call unsubscribe. There could be a race condition with close()'ing the KafkaConsumer when the rest of the application is finalized.

@svroonland
Copy link
Collaborator

Nope, ignore that, the final command is StopRunloop, which results in runloop.run completing, which is awaited in the Runloop.make finalizer. It means all the previous commands have executed.

@svroonland
Copy link
Collaborator

It seems that the KafkaConsumer must be closed from the same thread which is not guaranteed with ZIO.acquireRelease AFAIK
That is not a correct conclusion, the error is about two threads concurrently using the KafkaConsumer.

All access from zio-kafka's Consumer to the underlying KafkaConsumer is guarded by a Semaphore with just 1 permit, so there should not be any concurrent access. There are two exceptions:

  • the call to wakeup when a ZIO effet is interrupted, but that is documented to be thread-safe.
  • When using rebalanceSafeCommits is used, access from the RebalanceListener is not guarded.

Since we haven't seen this issue occur ourselves, we don't know which version this occurred with and how it can be reproduced, I propose we close this issue. Should this occur for anyone at a later time or should more details pop up, it can of course be reopened.

@svroonland
Copy link
Collaborator

svroonland commented Nov 9, 2024

Observed another one in tests for #1201 (see https://github.com/zio/zio-kafka/actions/runs/11755707706/job/32751057919?pr=1201):

2024-11-09T11:06:23.8841473Z 11:06:23.880 [zio-kafka-runloop-thread-31] [] ERROR zio.kafka.consumer.internal.Runloop - Error in Runloop
2024-11-09T11:06:23.8843968Z zio.FiberFailure: KafkaConsumer is not safe for multi-threaded access. currentThread(name: zio-kafka-runloop-thread-31, id: 212) otherThread(id: 112)
2024-11-09T11:06:23.8846382Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1227)
2024-11-09T11:06:23.8848601Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1208)
2024-11-09T11:06:23.8850730Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.assignment(LegacyKafkaConsumer.java:399)
2024-11-09T11:06:23.8852797Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:634)
2024-11-09T11:06:23.8854353Z 	at zio.kafka.consumer.internal.Runloop.$anonfun$handlePoll$7(Runloop.scala:487)
2024-11-09T11:06:23.8855935Z 	at zio.kafka.consumer.internal.ConsumerAccess.runloopAccess(ConsumerAccess.scala:38)
2024-11-09T11:06:23.8857657Z 	at zio.kafka.consumer.internal.Runloop.$anonfun$handlePoll$6(Runloop.scala:485)
2024-11-09T11:06:23.8859093Z 	at zio.kafka.consumer.internal.Runloop.handlePoll(Runloop.scala:483)
2024-11-09T11:06:23.8860554Z 	at zio.kafka.consumer.internal.Runloop.run(Runloop.scala:789)
2024-11-09T11:06:23.8862086Z 	at zio.kafka.consumer.internal.Runloop.StreamOps.runFoldChunksDiscardZIO(Runloop.scala:833)
2024-11-09T11:06:23.8865298Z Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: zio-kafka-runloop-thread-31, id: 212) otherThread(id: 112)
2024-11-09T11:06:23.8868107Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1227)
2024-11-09T11:06:23.8870644Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1208)
2024-11-09T11:06:23.8872935Z 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.assignment(LegacyKafkaConsumer.java:399)
2024-11-09T11:06:23.8875003Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:634)
2024-11-09T11:06:23.8876630Z 	at zio.kafka.consumer.internal.Runloop.$anonfun$handlePoll$7(Runloop.scala:487)
2024-11-09T11:06:23.8878208Z 	at zio.kafka.consumer.internal.ConsumerAccess.runloopAccess(ConsumerAccess.scala:38)
2024-11-09T11:06:23.8879975Z 	at zio.kafka.consumer.internal.Runloop.$anonfun$handlePoll$6(Runloop.scala:485)
2024-11-09T11:06:23.8881373Z 	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1077)
2024-11-09T11:06:23.8882575Z 	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1067)
2024-11-09T11:06:23.8883920Z 	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1097)
2024-11-09T11:06:23.8885150Z 	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1097)
2024-11-09T11:06:23.8890664Z 	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1097)
2024-11-09T11:06:23.8892094Z 	at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:413)
2024-11-09T11:06:23.8893509Z 	at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:489)
2024-11-09T11:06:23.8895181Z 	at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:250)
2024-11-09T11:06:23.8896508Z 	at zio.internal.FiberRuntime.run(FiberRuntime.scala:138)
2024-11-09T11:06:23.8897939Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
2024-11-09T11:06:23.8899811Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
2024-11-09T11:06:23.8901201Z 	at java.base/java.lang.Thread.run(Thread.java:1583)

Still no clue what is causing that. The call to assignment() is guarded by the semaphore and is not performed during a poll, so this has nothing to do with the rebalance listener..

It's fixed by this: 5ef97ef

@svroonland
Copy link
Collaborator

It's not at all unlikely that there is some race condition inside the apache kafka consumer itself..

@erikvanoosten
Copy link
Collaborator

It's not at all unlikely that there is some race condition inside the apache kafka consumer itself..

Indeed. Wouldn't surprise me anymore. It would explain spurious ConcurrentModificationExceptions in my previous team's f2s-kafka project.

@svroonland
Copy link
Collaborator

All access from zio-kafka's Consumer to the underlying KafkaConsumer is guarded by a Semaphore with just 1 permit, so there should not be any concurrent access. There are two exceptions

Okay I was wrong there. There is a third exception: the close() call is not guarded by a semaphore, since #1011. It's possible that something in the Runloop is still executing commands, perhaps a commit, after shutdown. We should try to fix that but it might be safer here to restore the semaphore guard. That means there will be a bit more duplicated code, but that's fine. I'll create a PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants