Skip to content

Commit

Permalink
fixing how IGateway works
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Nov 2, 2024
1 parent 2291cb5 commit 2a5e75a
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
2 changes: 2 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface IGateway : IGrainObserver
ValueTask BroadcastEvent(CloudEvent evt, CancellationToken cancellationToken = default);
ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default);
Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default);

}
4 changes: 2 additions & 2 deletions dotnet/src/Microsoft.AutoGen/Agents/Services/Gateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class Gateway : BackgroundService, IGateway, IGrainWithIntegerKey

private readonly ConcurrentDictionary<string, List<InMemoryQueue<CloudEvent>>> _supportedAgentTypes = [];
private readonly ConcurrentDictionary<(string Type, string Key), InMemoryQueue<CloudEvent>> _agentDirectory = new();
private readonly ConcurrentDictionary<InMemoryQueue<CloudEvent>, InMemoryQueue<CloudEvent>> _workers = new();
public readonly ConcurrentDictionary<IConnection, IConnection> _workers = new();
private readonly ConcurrentDictionary<(InMemoryQueue<Message>, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
private readonly InMemoryQueue<Message> _messageQueue = new();

Expand All @@ -44,7 +44,7 @@ public async ValueTask BroadcastEvent(CloudEvent evt, CancellationToken cancella
}

// intentionally not static
private async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
public virtual async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
var queue = (InMemoryQueue<CloudEvent>)connection;
await queue.Writer.WriteAsync(cloudEvent, cancellationToken).AsTask().ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ internal sealed class GrpcGateway : Gateway, IGateway
private readonly IAgentRegistry _gatewayRegistry;
private readonly IGateway _reference;

// The local mapping of agents to worker processes.
private readonly ConcurrentDictionary<GrpcWorkerConnection, GrpcWorkerConnection> _workers = new();
// The agents supported by each worker process.
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection>> _supportedAgentTypes = [];
// The mapping from agent id to worker process.
Expand All @@ -33,7 +31,7 @@ public GrpcGateway(IClusterClient clusterClient, ILogger<Gateway> logger) : base
_gatewayRegistry = clusterClient.GetGrain<IAgentRegistry>(0);
}
//intetionally not static so can be called by some methods implemented in base class
private async ValueTask SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
public override async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
var queue = (GrpcWorkerConnection)connection;
await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false);
Expand Down

0 comments on commit 2a5e75a

Please sign in to comment.