Skip to content

Commit

Permalink
getting rid of even more
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Nov 9, 2024
1 parent 910e3a5 commit 1b3bf38
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 318 deletions.
13 changes: 0 additions & 13 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRegistry.cs

This file was deleted.

15 changes: 0 additions & 15 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IGateway.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -39,8 +38,8 @@ public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder b
if (useGrpc)
{
builder.Services.AddGrpc();
builder.Services.AddSingleton<IGateway, GrpcGateway>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<IGateway>());
builder.Services.AddSingleton<GrpcGateway>();
builder.Services.AddSingleton<IHostedService>(sp => (IHostedService)sp.GetRequiredService<GrpcGateway>());
}

return builder;
Expand Down
132 changes: 0 additions & 132 deletions dotnet/src/Microsoft.AutoGen/Agents/Services/Gateway.cs

This file was deleted.

69 changes: 63 additions & 6 deletions dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,49 @@
using System.Collections.Concurrent;
using Grpc.Core;
using Microsoft.AutoGen.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Microsoft.AutoGen.Agents;

internal sealed class GrpcGateway : Gateway, IGateway, IGrainObserver
public sealed class GrpcGateway : BackgroundService, IGrainObserver
{
private readonly ILogger<Gateway> _logger;
private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30);
private readonly ILogger<GrpcGateway> _logger;
private readonly IClusterClient _clusterClient;
private readonly IAgentRegistry _gatewayRegistry;
private readonly IGateway _reference;
private readonly ConcurrentDictionary<string, AgentState> _agentState = new();
private readonly RegistryGrain _gatewayRegistry;
private readonly GrpcGateway _reference;
// The agents supported by each worker process.
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection>> _supportedAgentTypes = [];
public readonly ConcurrentDictionary<IConnection, IConnection> _workers = new();

// The mapping from agent id to worker process.
private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new();
// RPC
private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
// InMemory Message Queue

public GrpcGateway(IClusterClient clusterClient, ILogger<Gateway> logger, IAgentRegistry gatewayRegistry) : base(logger, gatewayRegistry)
public GrpcGateway(IClusterClient clusterClient, ILogger<GrpcGateway> logger, RegistryGrain gatewayRegistry)
{
_logger = logger;
_clusterClient = clusterClient;
_reference = clusterClient.CreateObjectReference<GrpcGateway>(this);
_gatewayRegistry = clusterClient.GetGrain<RegistryGrain>(0);
}
public async ValueTask BroadcastEvent(CloudEvent evt, CancellationToken cancellationToken = default)
{
// TODO: filter the workers that receive the event
var tasks = new List<Task>(_workers.Count);
foreach (var (_, connection) in _supportedAgentTypes)
{

tasks.Add(this.SendMessageAsync((IConnection)connection[0], evt, default));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
//intetionally not static so can be called by some methods implemented in base class
public override async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
public async Task SendMessageAsync(IConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
var queue = (GrpcWorkerConnection)connection;
await queue.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -149,6 +165,20 @@ internal Task ConnectToWorkerProcess(IAsyncStreamReader<Message> requestStream,
_workers[workerProcess] = workerProcess;
return workerProcess.Completion;
}
public async ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId));
_agentState[agentId.Key] = value;
}

public async ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default)
{
if (_agentState.TryGetValue(agentId.Key, out var state))
{
return state;
}
return new AgentState { AgentId = agentId };
}
internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess)
{
_workers.TryRemove(workerProcess, out _);
Expand All @@ -169,4 +199,31 @@ internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess)
}
}
}
public async ValueTask<RpcResponse> InvokeRequest(RpcRequest request, CancellationToken cancellationToken = default)
{
(string Type, string Key) agentId = (request.Target.Type, request.Target.Key);
if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted)
{
// Activate the agent on a compatible worker process.
if (_supportedAgentTypes.TryGetValue(request.Target.Type, out var workers))
{
connection = workers[Random.Shared.Next(workers.Count)];
_agentDirectory[agentId] = connection;
}
else
{
return new(new RpcResponse { Error = "Agent not found." });
}
}
// Proxy the request to the agent.
var originalRequestId = request.RequestId;
var newRequestId = Guid.NewGuid().ToString();
var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously);
request.RequestId = newRequestId;
await connection.ResponseStream.WriteAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
// Wait for the response and send it back to the caller.
var response = await completion.Task.WaitAsync(s_agentResponseTimeout);
response.RequestId = originalRequestId;
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@

namespace Microsoft.AutoGen.Agents;

public sealed class RegistryGrain : Grain, IAgentRegistry, IGrainWithIntegerKey
public sealed class RegistryGrain : Grain, IGrainWithIntegerKey
{
// TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively.
private readonly Dictionary<IGateway, WorkerState> _workerStates = [];
private readonly Dictionary<string, List<IGateway>> _supportedAgentTypes = [];
private readonly Dictionary<(string Type, string Key), IGateway> _agentDirectory = [];
private readonly Dictionary<GrpcGateway, WorkerState> _workerStates = new();
private readonly Dictionary<string, List<GrpcGateway>> _supportedAgentTypes = [];
private readonly Dictionary<(string Type, string Key), GrpcGateway> _agentDirectory = [];
private readonly TimeSpan _agentTimeout = TimeSpan.FromMinutes(1);

public override Task OnActivateAsync(CancellationToken cancellationToken)
{
this.RegisterGrainTimer(static state => state.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
return base.OnActivateAsync(cancellationToken);
}
public ValueTask<(IGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId)
public ValueTask<(GrpcGateway? Worker, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId)
{
// TODO:
bool isNewPlacement;
Expand All @@ -44,7 +44,7 @@ public override Task OnActivateAsync(CancellationToken cancellationToken)
}
return new((worker, isNewPlacement));
}
public ValueTask RemoveWorker(IGateway worker)
public ValueTask RemoveWorker(GrpcGateway worker)
{
if (_workerStates.Remove(worker, out var state))
{
Expand All @@ -58,7 +58,7 @@ public ValueTask RemoveWorker(IGateway worker)
}
return ValueTask.CompletedTask;
}
public ValueTask RegisterAgentType(string type, IGateway worker)
public ValueTask RegisterAgentType(string type, GrpcGateway worker)
{
if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes))
{
Expand All @@ -73,12 +73,12 @@ public ValueTask RegisterAgentType(string type, IGateway worker)
workerState.SupportedTypes.Add(type);
return ValueTask.CompletedTask;
}
public ValueTask AddWorker(IGateway worker)
public ValueTask AddWorker(GrpcGateway worker)
{
GetOrAddWorker(worker);
return ValueTask.CompletedTask;
}
public ValueTask UnregisterAgentType(string type, IGateway worker)
public ValueTask UnregisterAgentType(string type, GrpcGateway worker)
{
if (_workerStates.TryGetValue(worker, out var state))
{
Expand Down Expand Up @@ -111,7 +111,7 @@ private Task PurgeInactiveWorkers()
return Task.CompletedTask;
}

private WorkerState GetOrAddWorker(IGateway worker)
private WorkerState GetOrAddWorker(GrpcGateway worker)
{
if (!_workerStates.TryGetValue(worker, out var workerState))
{
Expand All @@ -122,9 +122,9 @@ private WorkerState GetOrAddWorker(IGateway worker)
return workerState;
}

public ValueTask<IGateway?> GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type));
public ValueTask<GrpcGateway?> GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type));

private IGateway? GetCompatibleWorkerCore(string type)
private GrpcGateway? GetCompatibleWorkerCore(string type)
{
if (_supportedAgentTypes.TryGetValue(type, out var workers))
{
Expand Down
Loading

0 comments on commit 1b3bf38

Please sign in to comment.