Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System.InvalidOperationException when accessing InMemoryWindowed Store #314

Open
8 tasks
rao-mayur opened this issue Feb 16, 2024 · 1 comment
Open
8 tasks

Comments

@rao-mayur
Copy link

Description

After setting up an InMemoryWindowed Store using aggregation, I want to access the Store by doing a FetchAll() on the windowed store. However when trying to do this the Stream thread crashes and outputs the following error (complete stack trace is from Streamiz) :

"System.InvalidOperationException",
"Collection was modified; enumeration operation may not execute.",
\t at System.Collections.Generic.HashSet1.Enumerator.MoveNext() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time) \n\t at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore2.<>c__DisplayClass17_0.b__0() \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) \n\t at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor4.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.StreamTask.Process() \n\t at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)

This is specially relevant when there is lag build up on the source topic.

Can you please review this and let me know if there is a bug in the library and something in RemoveExpiredData() is not thread safe? Meaning race conditions may happen leading to a crash like above?

I refer to this guide https://lgouellec.github.io/kafka-streams-dotnet/stores.html#in-memory-window-store to look at samples and best practices while using Streamiz. Can you please update that or provide me a recommended example/sample of setting up the WindowedStore and accessing it for lookups? I want to understand the correct way to call FetchAll() without crashing the streams thread.

How to reproduce

  1. Setup a sample topic. Pump some messages (around 80 or 90k) first. Make sure that the streams thread has to process some lag.
  2. Run the sample streams topology
    Program.txt
  3. After some time InvalidOperationException error should appear and cause the stream to die.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. - included
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic") - in the Program.txt
  • Streamiz.Kafka.Net nuget version. - 1.5.0-RC2
  • Apache Kafka version. - 3.5.x
  • Client configuration. - included in the Program
  • Operating system. - linux
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration). - included above
  • Critical issue. - yes, crashing the streams thread leading to app shutdown
@LGouellec
Copy link
Owner

Hey @rao-mayur ,

Sorry for the delay, I will try to reproduce the issue and fix that.
Come back to you ASAP.
Regards,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants