Skip to content

Commit

Permalink
add SendMessageAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Nov 12, 2024
1 parent 3530a99 commit 55147a9
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 24 deletions.
1 change: 1 addition & 0 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public interface IAgentRuntime
ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcRequest request, RpcResponse response, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default);
ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default);
void Update(Activity? activity, RpcRequest request);
void Update(Activity? activity, CloudEvent cloudEvent);
Expand Down
1 change: 1 addition & 0 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public interface IAgentWorker
ValueTask PublishEventAsync(CloudEvent evt, CancellationToken cancellationToken = default);
ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, CancellationToken cancellationToken = default);
ValueTask SendResponseAsync(RpcResponse response, CancellationToken cancellationToken = default);
ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default);
ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default);
ValueTask<AgentState> ReadAsync(AgentId agentId, CancellationToken cancellationToken = default);
}
4 changes: 4 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public async ValueTask SendRequestAsync(IAgentBase agent, RpcRequest request, Ca
{
await worker.SendRequestAsync(agent, request, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default)
{
await worker.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
public async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
await worker.PublishEventAsync(@event, cancellationToken).ConfigureAwait(false);
Expand Down
4 changes: 4 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public ValueTask SendResponseAsync(RpcResponse response, CancellationToken cance
{
return _mailbox.Writer.WriteAsync(new Message { Response = response }, cancellationToken);
}
public ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default)
{
return _mailbox.Writer.WriteAsync(message, cancellationToken);
}
public ValueTask StoreAsync(AgentState value, CancellationToken cancellationToken = default)
{
var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ await WriteChannelAsync(new Message
await WriteChannelAsync(new Message { Request = request }, cancellationToken).ConfigureAwait(false);
}
// new is intentional
public new async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(message, cancellationToken).ConfigureAwait(false);
}
// new is intentional
public new async ValueTask PublishEventAsync(CloudEvent @event, CancellationToken cancellationToken = default)
{
await WriteChannelAsync(new Message { CloudEvent = @event }, cancellationToken).ConfigureAwait(false);
Expand Down
30 changes: 6 additions & 24 deletions dotnet/test/Microsoft.AutoGen.Agents.Tests/GrpcGatewayTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,7 @@ private async ValueTask Setup()
[Fact]
public async Task AddSubscriptionAsync_SendsAddSubscriptionRequest_AndChecksAddSubscriptionResponse()
{
// Arrange
var request = new AddSubscriptionRequest
{
RequestId = "test-request-id",
Subscription = new Subscription
{
TypeSubscription = new TypeSubscription
{
TopicType = "test-topic",
AgentType = "test-agent-type"
}
}
};


var responseStream = new Mock<IServerStreamWriter<Message>>();
_mockConnection.SetupGet(c => c.ResponseStream).Returns(responseStream.Object);

Check failure on line 31 in dotnet/test/Microsoft.AutoGen.Agents.Tests/GrpcGatewayTests.cs

View workflow job for this annotation

GitHub Actions / Dotnet Build (ubuntu-latest, 3.11)

Avoid multiple blank lines

Check failure on line 31 in dotnet/test/Microsoft.AutoGen.Agents.Tests/GrpcGatewayTests.cs

View workflow job for this annotation

GitHub Actions / Dotnet Build (macos-latest, 3.11)

Avoid multiple blank lines
// Act
await _grpcGateway.AddSubscriptionAsync(_mockConnection.Object, request);

// Assert
responseStream.Verify(stream => stream.WriteAsync(It.Is<Message>(msg =>
Expand All @@ -64,13 +45,14 @@ public class TestAgent(
{
public async Task Handle(NewMessageReceived item)
{

// update our subscription requests
await SendSubscriptionRequestAsync().ConfigureAwait(false);
}
private async SendSubscriptionRequestAsync()
private async ValueTask SendSubscriptionRequestAsync(CancellationToken cancellationToken = default)
{
RpcRequest request = new()
Message request = new()
{
Payload = new AddSubscriptionRequest()
AddSubscriptionRequest = new()
{
RequestId = "test-request-id",
Subscription = new Subscription
Expand All @@ -83,7 +65,7 @@ private async SendSubscriptionRequestAsync()
}
}
};
return this.Context.SendRequestAsync(this,request);
await Context.SendMessageAsync(request, cancellationToken).ConfigureAwait(false);
}
}
}

0 comments on commit 55147a9

Please sign in to comment.