You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
After setting up an InMemoryWindowed Store using aggregation, I want to access the Store by doing a FetchAll() on the windowed store. However when trying to do this the Stream thread crashes and outputs the following error (complete stack trace is from Streamiz) :
"System.InvalidOperationException",
"Collection was modified; enumeration operation may not execute.",
\t at System.Collections.Generic.HashSet1.Enumerator.MoveNext() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time) \n\t at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore2.<>c__DisplayClass17_0.b__0() \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) \n\t at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor4.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.StreamTask.Process() \n\t at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
This is specially relevant when there is lag build up on the source topic.
Can you please review this and let me know if there is a bug in the library and something in RemoveExpiredData() is not thread safe? Meaning race conditions may happen leading to a crash like above?
I refer to this guide https://lgouellec.github.io/kafka-streams-dotnet/stores.html#in-memory-window-store to look at samples and best practices while using Streamiz. Can you please update that or provide me a recommended example/sample of setting up the WindowedStore and accessing it for lookups? I want to understand the correct way to call FetchAll() without crashing the streams thread.
How to reproduce
Setup a sample topic. Pump some messages (around 80 or 90k) first. Make sure that the streams thread has to process some lag.
Description
After setting up an InMemoryWindowed Store using aggregation, I want to access the Store by doing a FetchAll() on the windowed store. However when trying to do this the Stream thread crashes and outputs the following error (complete stack trace is from Streamiz) :
"System.InvalidOperationException",
"Collection was modified; enumeration operation may not execute.",
\t at System.Collections.Generic.HashSet
1.Enumerator.MoveNext() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time) \n\t at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore
2.<>c__DisplayClass17_0.b__0() \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) \n\t at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor
4.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable
1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor
2.Forward(IEnumerable1 processors, Action
1 action) \n\t at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor
2.Process(ConsumeResult`2 record) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.StreamTask.Process() \n\t at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)This is specially relevant when there is lag build up on the source topic.
Can you please review this and let me know if there is a bug in the library and something in RemoveExpiredData() is not thread safe? Meaning race conditions may happen leading to a crash like above?
I refer to this guide https://lgouellec.github.io/kafka-streams-dotnet/stores.html#in-memory-window-store to look at samples and best practices while using Streamiz. Can you please update that or provide me a recommended example/sample of setting up the WindowedStore and accessing it for lookups? I want to understand the correct way to call FetchAll() without crashing the streams thread.
How to reproduce
Program.txt
Checklist
Please provide the following information:
The text was updated successfully, but these errors were encountered: