Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserverManager needs a ConcurrentDictionary #9242

Open
turowicz opened this issue Nov 21, 2024 · 5 comments
Open

ObserverManager needs a ConcurrentDictionary #9242

turowicz opened this issue Nov 21, 2024 · 5 comments

Comments

@turowicz
Copy link

We keep getting errors like this in a high client density deployment:

 System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
   at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()
   at Orleans.Utilities.ObserverManager`2.Notify(Func`2 notification, Func`2 predicate) in /_/src/Orleans.Core/Utils/ObserverManager.cs:line 156

It seems like an easy fix that we would love to see in Orleans 8.x. Please change the field _observers to ConcurrentDictionary<>.

cc @ReubenBond

@ReubenBond
Copy link
Member

Are you able to share some code? This doesn't seem right - there should be no concurrent modifications. Is it 8.2.0+ or earlier? Changing the impl to use concurrent dictionary is ok, but I'd like to make sure we understand the cause first.

@turowicz
Copy link
Author

turowicz commented Nov 22, 2024

Orleans 8.2.0

So I have a Speed Trap system that contains a UnitGrain that accepts an image frame (from camera stream) when a violation happens and publishes it to all observers. The observers list changes constantly as users jump in and out of the system, which provides realtime alerts when violations occur. So I imagine if a user hop in or out of the system while ObserverManager is iterating the _observers with a foreach that error will occurr.

Code is as follows:

People subscribe in a Signalr Hub:

var unitKind = (UnitKind)unit.Kind;
var grain = GetUnitGrain(_siloClient, unitKind, key);
var observer = new UnitGrainObserver(Context.ConnectionId, NotifySubscriberAsync);
var reference = _siloClient.CreateObjectReference<IUnitGrainObserver>(observer);
if (feeds.TryAdd(key, unitKind))
{
    await grain.SubscribeAsync(Context.ConnectionId, reference);
}

That then goes to the Grain:

public virtual Task SubscribeAsync(string key, IUnitGrainObserver observer)
{
    using (Logger.Start($"{GrainType}", $"{nameof(SubscribeAsync)}(key,observer)"))
    {
        previewSubscriptions.Subscribe(key, observer);
        MetricsLogger.GaugeInc(GraphMetrics.Viewers, Unit.Id.ToString(), Unit.Name);
        return Task.CompletedTask;
    }
}

The Unsubscribe happens the same way but it also autounsubscribes when a WebSocket connection dies.

public override async Task OnDisconnectedAsync(Exception exception)
{
    try
    {
        if (_connections.TryGetValue(Context.ConnectionId, out var feeds))
        {
            foreach (var feed in feeds.Distinct())
            {
                var split = feed.Key.Split('/');
                await Unsubscribe(split[1], split[2]);
            }
        }
}

The UnitObserver itself looks like this:

public class UnitGrainObserver : IUnitGrainObserver
{
    private readonly string _connectionId;
    private readonly Func<string, Immutable<ImmutableUnitContext>, Immutable<IEnumerable<IData>>, Task> _onReceive;
    private bool _isDisposed;
    public UnitGrainObserver(string connectionId, Func<string, Immutable<ImmutableUnitContext>, Immutable<IEnumerable<IData>>, Task> onReceive)
    {
        _onReceive = onReceive;
        _connectionId = connectionId;
    }
    public async Task ReceiveAsync(Immutable<ImmutableUnitContext> context, Immutable<IEnumerable<IData>> data)
    {
        if (!_isDisposed)
        {
            await _onReceive(_connectionId, context, data);
        }
    }
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            _isDisposed = true;
        }
    }
}

And the way we notify all observers is just one line:

previewSubscriptions = CreatePreviewSubscriptions(31);
...
using (Logger.Start($"{GrainType}", $"{nameof(StreamPreviewAsync)}(context).Notify"))
{
    await previewSubscriptions.Notify(async s =>
    {
        using (Logger.Start($"{GrainType}", $"{nameof(StreamPreviewAsync)}(context).Notify.Payload"))
        {
            var payload = message.Context.Clone();
                await s.ReceiveAsync(new Immutable<ImmutableUnitContext>(payload), new Immutable<IEnumerable<IData>>(message.Data));
        }
        MetricsLogger.CounterInc(GraphMetrics.FrameCounter, GrainType, "out", "preview", Unit.Id.ToString());
    });
}

@ReubenBond
Copy link
Member

Is reentrancy involved anywhere?

@turowicz
Copy link
Author

No reentrancy in the entire codebase

@scalalang2
Copy link
Contributor

👀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants