Skip to content

Commit

Permalink
read events
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Nov 6, 2024
1 parent 81a79d5 commit 4f1ed1e
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public AgentWorker(
IServiceProvider serviceProvider,
[FromKeyedServices("AgentTypes")] IEnumerable<Tuple<string, Type>> configuredAgentTypes,
ILogger<GrpcAgentWorker> logger,
DistributedContextPropagator distributedContextPropagator)
DistributedContextPropagator distributedContextPropagator)
{
_logger = logger;
_serviceProvider = serviceProvider;
Expand All @@ -60,7 +60,7 @@ public AgentWorker(
public InMemoryQueue<CloudEvent> GetEventQueue() => _eventsQueue;
public async ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default)
{
await this.WriteAsync(evt,cancellationToken).ConfigureAwait(false);
await this.WriteAsync(evt, cancellationToken).ConfigureAwait(false);
}
public ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -134,17 +134,26 @@ await WriteChannelAsync(new Message
}, cancellationToken).ConfigureAwait(false);
}
}
public async Task RunReadPump()
public async Task RunReadPump()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
try
{
await foreach (var message in _messageQueue.Reader.ReadAllAsync(_shutdownCancellationToken.Token))
await foreach (var message in _messageQueue.Reader.ReadAllAsync(_shutdownCancellationToken.Token).ConfigureAwait(false))
{

// Fire and forget
_gateway.OnReceivedMessageAsync(this, message).Ignore();
}
await foreach (var message in _eventsQueue.Reader.ReadAllAsync(_shutdownCancellationToken.Token).ConfigureAwait(false))
{

foreach (var (typeName, _) in _agentTypes)
{
var agent = GetOrActivateAgent(new AgentId(typeName, message.Source));
agent.ReceiveMessage(new Message { CloudEvent = message });
}
}
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -190,7 +199,6 @@ void StartCore()
}
}
}

public async Task StopAsync(CancellationToken cancellationToken)
{
_shutdownCts.Cancel();
Expand Down Expand Up @@ -257,7 +265,7 @@ private async Task RunWritePump()
}
}
}
private IAgentBase GetOrActivateAgent(AgentId agentId)
private IAgentBase GetOrActivateAgent(AgentId agentId)
{
if (!_agents.TryGetValue((agentId.Type, agentId.Key), out var agent))
{
Expand Down

0 comments on commit 4f1ed1e

Please sign in to comment.