Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
kostapetan committed Nov 22, 2024
1 parent 16dbe23 commit 985b165
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 51 deletions.
14 changes: 6 additions & 8 deletions dotnet/samples/Hello-distributed/Backend/Agents/HelloAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,21 @@ namespace Backend.Agents;
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase(
[FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<HelloAgent> logger) : AgentBase(
context,
typeRegistry),
IHandleConsole,
typeRegistry, logger),
IHandle<NewGreetingRequested>
{
public async Task Handle(NewGreetingRequested item)
{
_logger.LogInformation($"HelloAgent with Id: {AgentId} received NewGreetingRequested with {item.Message}");
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output { Message = response };
await PublishMessageAsync(evt).ConfigureAwait(false);
var goodbye = new NewGreetingGenerated
var greeting = new NewGreetingGenerated
{
UserId = AgentId.Key,
UserMessage = "Goodbye"
UserMessage = response
};
await PublishMessageAsync(goodbye).ConfigureAwait(false);
await PublishMessageAsync(greeting).ConfigureAwait(false);
}

public async Task<string> SayHello(string ask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ namespace Backend.Agents;
[TopicSubscription("HelloAgents")]
public class OutputAgent(
IAgentRuntime context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : AgentBase(
[FromKeyedServices("EventTypes")] EventTypes typeRegistry, ILogger<OutputAgent> logger) : AgentBase(
context,
typeRegistry),
IHandleConsole,
typeRegistry, logger),
IHandle<NewGreetingGenerated>
{
public async Task Handle(NewGreetingGenerated item)
{
// TODO: store to memory

_logger.LogInformation($"OutputAgent with Id: {AgentId} received NewGreetingGenerated with {item.UserMessage}");
}
}
5 changes: 3 additions & 2 deletions dotnet/samples/Hello-distributed/Backend/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@

var agentHostUrl = builder.Configuration["AGENT_HOST"]!;
builder.AddAgentWorker(agentHostUrl)
.AddAgent<HelloAgent>(nameof(HelloAgent));
.AddAgent<HelloAgent>(nameof(HelloAgent))
.AddAgent<OutputAgent>(nameof(OutputAgent));

builder.Services.AddSingleton<AgentWorker>();

var app = builder.Build();

app.MapDefaultEndpoints();

app.MapPost("/sessions", async ([FromBody]string message, AgentWorker client) =>
app.MapPost("/sessions", async ([FromBody]string message, Client client) =>
{
var session = Guid.NewGuid().ToString();
await client.PublishEventAsync(new NewGreetingRequested { Message = message }.ToCloudEvent(session));
Expand Down
44 changes: 21 additions & 23 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,36 +225,34 @@ public Task CallHandler(CloudEvent item)
{
// Only send the event to the handler if the agent type is handling that type
// foreach of the keys in the EventTypes.EventsMap[] if it contains the item.type
foreach (var key in EventTypes.EventsMap.Keys)

foreach (var type in EventTypes.TypesMap[item.Type])
{
if (EventTypes.EventsMap[key].Contains(item.Type))
{
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]);
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]);

MethodInfo methodInfo;
try
MethodInfo methodInfo;
try
{
// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask;
}
else
{
// The error here is we have registered for an event that we do not have code to listen to
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation.");
}
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask;
}
catch (Exception ex)
else
{
_logger.LogError(ex, $"Error invoking method {nameof(IHandle<object>.Handle)}");
throw; // TODO: ?
// The error here is we have registered for an event that we do not have code to listen to
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error invoking method {nameof(IHandle<object>.Handle)}");
throw; // TODO: ?
}
}

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,22 @@ public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilde
var descriptors = pairs.Select(t => t.Item2);
var typeRegistry = TypeRegistry.FromMessages(descriptors);
var types = pairs.ToDictionary(item => item.Item2?.FullName ?? "", item => item.t);
var eventsMap = AppDomain.CurrentDomain.GetAssemblies()
var typesForEvents = new Dictionary<string, HashSet<Type>>();
var eventsForType = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
.Where(type => ReflectionHelper.IsSubclassOfGeneric(type, typeof(AgentBase)) && !type.IsAbstract)
.Select(t => (t, t.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>))
.Select(i => (GetMessageDescriptor(i.GetGenericArguments().First())?.FullName ?? "")).ToHashSet()))
.ToDictionary(item => item.t, item => item.Item2);
.Select(t =>
{
var events = t.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandle<>))
.Select(i => (GetMessageDescriptor(i.GetGenericArguments().First())?.FullName ?? "")).ToHashSet();
foreach(var evt in events) {
if (!typesForEvents.TryGetValue(evt, out var value)) { value = new HashSet<Type>(); typesForEvents[evt] = value; }
value.Add(t);
}
return (t, events);
}).ToDictionary(item => item.t, item => item.Item2);
// if the assembly contains any interfaces of type IHandler, then add all the methods of the interface to the eventsMap
var handlersMap = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(assembly => assembly.GetTypes())
Expand All @@ -72,30 +80,30 @@ public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilde
{
foreach (var iface in item)
{
if (eventsMap.TryGetValue(iface.Item2, out var events))
if (eventsForType.TryGetValue(iface.Item2, out var events))
{
events.UnionWith(iface.Item3);
}
else
{
eventsMap[iface.Item2] = iface.Item3;
eventsForType[iface.Item2] = iface.Item3;
}
}
}
// merge the handlersMap into the eventsMap
foreach (var item in handlersMap)
{
if (eventsMap.TryGetValue(item.Key, out var events))
if (eventsForType.TryGetValue(item.Key, out var events))
{
events.UnionWith(item.Value);
}
else
{
eventsMap[item.Key] = item.Value;
eventsForType[item.Key] = item.Value;
}
}
return new EventTypes(typeRegistry, types, eventsMap);
return new EventTypes(typeRegistry, types, eventsForType, typesForEvents);
});
builder.Services.AddSingleton<Client>();
return new AgentApplicationBuilder(builder);
Expand Down Expand Up @@ -141,11 +149,12 @@ public sealed class AgentTypes(Dictionary<string, Type> types)
return new AgentTypes(agents);
}
}
public sealed class EventTypes(TypeRegistry typeRegistry, Dictionary<string, Type> types, Dictionary<Type, HashSet<string>> eventsMap)
public sealed class EventTypes(TypeRegistry typeRegistry, Dictionary<string, Type> types, Dictionary<Type, HashSet<string>> eventsMap, Dictionary<string, HashSet<Type>> typesMap)
{
public TypeRegistry TypeRegistry { get; } = typeRegistry;
public Dictionary<string, Type> Types { get; } = types;
public Dictionary<Type, HashSet<string>> EventsMap { get; } = eventsMap;
public Dictionary<string, HashSet<Type>> TypesMap { get; } = typesMap;
}

public sealed class AgentApplicationBuilder(IHostApplicationBuilder builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class AgentBaseTests
public async Task ItInvokeRightHandlerTestAsync()
{
var mockContext = new Mock<IAgentRuntime>();
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger<AgentBase>(new LoggerFactory()));
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], [], []), new Logger<AgentBase>(new LoggerFactory()));

await agent.HandleObject("hello world");
await agent.HandleObject(42);
Expand Down

0 comments on commit 985b165

Please sign in to comment.