Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher (aka "dynamic handoffs") design proposal #4739

Open
JMLX42 opened this issue Dec 17, 2024 · 7 comments
Open

Dispatcher (aka "dynamic handoffs") design proposal #4739

JMLX42 opened this issue Dec 17, 2024 · 7 comments

Comments

@JMLX42
Copy link
Contributor

JMLX42 commented Dec 17, 2024

What feature would you like to be added?

Hello,

The current handoff system is very limiting. Specifically because the actual list of handoffs might change during the execution of the agentic system. But also because they have fixed messages that do not necessarily convey the exact goal/purpose of the handoff to the target agent.

In order to solve this, I propose an abstraction over List[Handoff] called Dispatcher:

from abc import ABC, abstractmethod
from collections.abc import Sequence
import logging
from typing import AsyncGenerator, Dict, List

from autogen_agentchat.base import Handoff
from autogen_agentchat.base._chat_agent import Response
from autogen_agentchat.messages import AgentMessage, ChatMessage
from autogen_core import CancellationToken
from autogen_core.tools import FunctionTool
from autogen_core.models import ChatCompletionClient
from autogen_workbench.workbench import (
    AbstractDynamicWorkbench,
    DynamicWorkbench,
    WorkbenchAgent,
)

logger = logging.getLogger(__name__)


class AbstractDispatcher(ABC):
    @property
    @abstractmethod
    def handoffs(self) -> List[Handoff]:
        """The handoffs that the dispatch can handle."""
        ...


class AbstractDynamicDispatcher(ABC):
    @abstractmethod
    def create_handoff(self, name: str, target: str, description: str, message: str) -> Handoff:
        """Create and add a handoff."""
        ...

    @abstractmethod
    def add_handoff(self, handoff: Handoff, enabled: bool = True) -> None:
        """Add a handoff."""
        ...

    @abstractmethod
    def delete_handoff(self, handoff_name: str) -> None:
        """Delete a handoff."""
        ...

    @abstractmethod
    def delete_all_handoffs(self) -> None:
        """Delete all handoffs."""
        ...

    @abstractmethod
    def enable_handoff(self, handoff_name: str) -> None:
        """Enable a handoff."""
        ...

    @abstractmethod
    def disable_handoff(self, handoff_name: str) -> None:
        """Disable a handoff."""
        ...

    @property
    @abstractmethod
    def enabled_handoffs(self) -> List[Handoff]:
        """The enabled handoffs."""
        ...


class Dispatcher(AbstractDispatcher):
    """A simple abstraction over a static list of handoffs."""

    def __init__(self, handoffs: List[Handoff]):
        self._handoffs: Dict[str, Handoff] = {handoff.name: handoff for handoff in handoffs}

    def handoffs(self) -> List[Handoff]:
        return list(self._handoffs.values())


class DynamicDispatcher(Dispatcher, AbstractDynamicDispatcher):
    """
    A dynamic dispatcher that can dynamically create, add/remove and enable/disable handoffs
    to an authorized list of agents.
    """

    def __init__(self, authorized_agents: List[str] | None = None, handoffs: List[Handoff] | None = None):
        super().__init__(handoffs=[])

        self._authorized_agents: List[str] = authorized_agents or []
        self._enabled_handoffs: Dict[str, bool] = {}

        if handoffs is not None:
            for handoff in handoffs:
                self.add_handoff(handoff, enabled=True)

    def create_handoff(self, name: str, target: str, description: str, message: str) -> Handoff:
        """
        Create and add a handoff.

        Args:
            name (str): The name of the target agent to handoff to. It must be a valid Python identifier.
            target (str): The target of the handoff.
            description (str): The description of the handoff such as the condition under which it should happen and
                the target agent's ability. The description must be 1024 characters or less.
            message (str): The message to the target agent.

        Returns:
            Handoff: The created handoff.

        Raises:
            ValueError: If the handoff name is not a valid Python identifier.

        Examples:
            handoff = create_handoff(
                name="process_payment",
                target="payment_processor_agent",
                description="Handoff when a payment needs processing, handled by the payment processor agent.",
                message="Please process the payment for order #12345."
            )
        """
        logger.info(f"creating handoff {name}.")
        if len(description) > 1024:
            raise ValueError("Description must be 1024 characters or less.")
        if target not in self._authorized_agents:
            raise ValueError(f"Target agent {target} is not authorized.")
        handoff = self._create_handoff(name, target, description, message)
        self.add_handoff(handoff, enabled=True)
        logger.info(f"created handoff {handoff}.")
        return handoff

    def _create_handoff(self, name: str, target: str, description: str, message: str) -> Handoff:
        return Handoff(target=target, description=description, name=name, message=message)

    def add_handoff(self, handoff: Handoff, enabled: bool = True) -> None:
        if handoff.name in self._handoffs:
            raise ValueError(f"Handoff name {handoff.name} already exists.")
        if handoff.target not in self._authorized_agents:
            raise ValueError(f"Target agent {handoff.target} is not authorized.")
        if isinstance(handoff, Handoff):
            self._handoffs[handoff.name] = handoff
        else:
            raise ValueError(f"Unsupported handoff type: {type(handoff)}")
        self._enabled_handoffs[handoff.name] = enabled

    def delete_handoff(self, handoff_name: str) -> None:
        self._handoffs.pop(handoff_name)
        self._enabled_handoffs.pop(handoff_name)

    def delete_all_handoffs(self) -> None:
        self._handoffs.clear()

    def enable_handoff(self, handoff_name) -> None:
        if handoff_name not in self._enabled_handoffs:
            raise ValueError(f"Handoff with name {handoff_name} does not exist.")
        self._enabled_handoffs[handoff_name] = True

    def disable_handoff(self, handoff_name):
        if handoff_name not in self._enabled_handoffs:
            raise ValueError(f"Handoff with name {handoff_name} does not exist.")
        self._enabled_handoffs[handoff_name] = False

    @property
    def enabled_handoffs(self) -> List[Handoff]:
        return [handoff for name, handoff in self._handoffs.items() if self._enabled_handoffs.get(name, False)]

With a DynamicDispatcher, it is then possible to create a DynamicDispatcherAgent that will both:

  • accept user-defined handoffs
  • be able to create its own custom handoffs (to a list of authorized agents)

Note: this implementation is based on the Workbench proposal (cf #4721).

class DynamicDispatcherAgent(WorkbenchAgent):
    """An agent that can dynamically create and call custom handoffs on its own."""

    def __init__(
        self,
        name: str,
        model_client: ChatCompletionClient,
        workbench: AbstractDynamicWorkbench | None = None,
        dispatcher: AbstractDynamicDispatcher | None = None,
        description: str = "An agent that provides assistance with ability to use tools and dynamic handoffs.",
        system_message: str
        | None = "You are a helpful AI assistant. Solve tasks using your tools. Reply with TERMINATE when the task "
        "has been completed.",
        reflect_on_tool_use: bool = False,
        tool_call_summary_format: str = "{result}",
    ):
        if model_client.capabilities["function_calling"] is False:
            raise ValueError("The model does not support function calling, which is needed for handoffs.")

        workbench = workbench or DynamicWorkbench(name=f"{name}_workbench")
        dispatcher = dispatcher or DynamicDispatcher()
        workbench.add_tool(
            FunctionTool(dispatcher.create_handoff, dispatcher.create_handoff.__doc__, name="create_handoff")
        )

        super().__init__(
            name=name,
            model_client=model_client,
            workbench=workbench,
            description=description,
            system_message=system_message,
            reflect_on_tool_use=reflect_on_tool_use,
            tool_call_summary_format=tool_call_summary_format,
            handoffs=[],
        )

        self._dispatcher = dispatcher

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[AgentMessage | Response, None]:
        self._handoffs = {handoff.name: handoff for handoff in self._dispatcher.enabled_handoffs}
        self._handoff_tools = [handoff.handoff_tool for handoff in self._handoffs.values()]

        # Validate that handoffs and tools do not share the same name.
        handoff_tool_names = [tool.name for tool in self._handoff_tools]
        tool_names = [tool.name for tool in self.workbench.enabled_tools]
        if set(handoff_tool_names).intersection(tool_names):
            raise ValueError("Handoff and tools cannot share the same name. Please rename one of the tools.")

        async for msg in super().on_messages_stream(messages, cancellation_token):
            yield msg

        self._handoffs.clear()
        self._handoff_tools.clear()

The following is then possible:

workbench_search_agent = WorkbenchSearchAgent(
        name="workbench_search_agent",
        model_client=model_client,
        workbench=openapi_agent.workbench,
        handoffs=[
            Handoff(
                target="planning_agent",
                description="Hand off your complete written analysis of available tools to the planning agent",
                message="I have analyzed the available tools.",
            ),
        ],
    )

    planning_agent = DynamicDispatcherAgent(
        name="planning_agent",
        model_client=model_client,
        system_message=(
            "You are a planning agent specialized in creating detailed execution plans. "
            "Follow these steps precisely in order:\n"
            "1. Create a list of high-level tasks needed to fulfill the user's request\n"
            "2. For each task:\n"
            "   - create a handoff for the workbench_search_agent handoff to discover available tools for that specific task\n"
            "   - perform that handoff\n"
            "3. After receiving tool information for each task, create a detailed step-by-step plan incorporating the specific tools\n"
            "4. Hand off the complete plan to the coordinator agent\n\n"
            "Important: You must use the workbench_search_agent handoff before attempting to create the final plan."
            "Only after you have created the complete step-by-step plan with all required tools, handoff to coordinator_agent."
        ),
        dispatcher=DynamicDispatcher(authorized_agents=["coordinator_agent", workbench_search_agent.name]),
    )

In this case, the planning_agent will create very specific handoffs to the workbench_agent:

2024-12-17 18:00:56 INFO     autogen_dispatch.dispatch create_handoff created handoff target='coordinator_agent' description='The complete step-by-step plan for listing glTFs using the glTF Live API, including preparation, execution, response handling, and error management.' name='complete_plan' message='Here is the detailed plan to list all available glTF resources using the glTF Live API. Execute the `list_gltfs` function to retrieve and present the data as specified in the plan. Handle any errors by adjusting parameters if necessary.'.
2024-12-17 18:05:38 INFO     autogen_dispatch.dispatch create_handoff created handoff target='workbench_search_agent' description='Handoff to discover available tools for working with the glTF Live API, specifically for finding nodes and their material names in a scene.' name='workbench_search_gltf_live_api_nodes' message='Please provide available tools for finding nodes and their material names in a scene using the glTF Live API.'.

Why is this needed?

With the Workbench proposal (cf #4721), the list of tools can be made entirely dynamic. And with this new proposal, so are the handoffs.

This proposal effectively makes it possible to build completely dynamic agent systems. Only the list of agents in the team remains fixed by the developer (for now...).

  • Evolving Requirements: In autogen-driven agentic systems, the set of tasks and their requirements may change over time, making a static predefined list of handoffs insufficient.

  • Dynamic Adaptation: By allowing dynamic creation, modification, and removal of handoffs, agents can adapt to new goals, contexts, or discovered capabilities as the conversation or problem-solving session unfolds.

  • Flexible Messaging: Fixed, static messages often fail to convey the precise intent or context when a handoff is needed. Dynamic handoffs enable agents to craft more contextually relevant and informative messages, ensuring smoother collaboration between agents.

  • Authorized Targets: With a dynamic dispatcher, agents can be given a list of authorized targets, ensuring that only specific agents can receive certain handoffs. This maintains security and operational constraints while still allowing flexibility.

  • Enhanced Autonomy: Allowing agents to create and manage their own handoffs without human intervention fosters greater autonomy and richer workflows, ultimately making the autogen framework more powerful and scalable.

@rysweet
Copy link
Collaborator

rysweet commented Dec 17, 2024

Thanks @JMLX42!
My rough take is that this is really what the autogen_core is for (ie you shouldn't need agent chat for this scenario) but it's possible the agent chat folks will be excited to dig into this.
Note that these might be easier to evaluate if opened as a PR. Excited to talk with you soon!

@JMLX42
Copy link
Contributor Author

JMLX42 commented Dec 17, 2024

My rough take is that this is really what the autogen_core is for (ie you shouldn't need agent chat for this scenario)

@rysweet could you please explain a bit more what you mean by that?

@rysweet
Copy link
Collaborator

rysweet commented Dec 17, 2024

@JMLX42 autogen_core is all about actor models reacting to events, so to model a handoff system you could define events for each task/request and events for task/request completion and define agents that consume those different events and I think you would not need agent chat - no real need for the agents to chat with one another vs just pass events to one another through the system.

Note that I think to do this well with autogen_core we do need that "unsubscribe" api that is missing #4357 - coming soon.

@JMLX42
Copy link
Contributor Author

JMLX42 commented Dec 17, 2024

you could define events for each task/request and events for task/request completion and define agents that consume those different events

@rysweet by "define" you mean "write Python code"? If that's the case, that's exactly what this proposal wants to avoid.

The goal is to handle dynamic scenarios where the agents need to pass information the developer has no prior knowledge of. For example, imagine an agent loading its tools from an API. The list of tools is not known in advance. Thus, if the handoffs between my agents are defined beforehand, they will only handle a fraction of the possible scenarios/outcome.

In the example above, the planning_agent has to do a handoff for each step of the plan. But there is no way for the developer to know beforehand how many steps the plan has. And each step might need a very specific Handoff.message to convey the proper meaning to the target agent.

I understand autogen_core provides "topics" (cf this doc). But then again - and I might be missing something - but all of this has to be defined beforehand in the Python code. I might be misunderstanding, but agents cannot simply create new topics based on the on-going execution. Can they?

@ekzhu
Copy link
Collaborator

ekzhu commented Dec 17, 2024

@rysweet I think having the issue as a design proposal is a great first step here. It gives us a bit more time to think about the implications.

@JMLX42 thanks for the proposal. I think the continuous self-adaptation is interesting here. Do you think this idea of dynamic dispatcher is an orchestration-level consideration rather than at each individual agent level? For example, I have seen approaches to generate a complete agent team from a given task. There is a separate effort here: #4226

@JMLX42
Copy link
Contributor Author

JMLX42 commented Dec 18, 2024

Do you think this idea of dynamic dispatcher is an orchestration-level consideration rather than at each individual agent level?

@ekzhu IMHO it works for both. The same Dispatcher instance (s) can be shared by multiple agents. Or not.

If this kind of feature is too high level for autogen_chat, we might at least change the type of tools to something like Iterable[Handoff]. It would allow the party packages to implement their own Handoff abstraction. That would be a good first step.

@ekzhu
Copy link
Collaborator

ekzhu commented Dec 18, 2024

If this kind of feature is too high level for autogen_chat, we might at least change the type of tools to something like Iterable[Handoff]. It would allow the party packages to implement their own Handoff abstraction. That would be a good first step.

I agree, same with tools. Making it an option to use collection type in the constructor argument and perform conversion internally if the input argument is not the collection type.

Let's discuss these in our office hours? https://aka.ms/autogen-officehour, we can also use discord https://aka.ms/autogen-discord

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants