Skip to content

Commit

Permalink
[CAP] Abstraction of actor_connector to go along with runtime factory…
Browse files Browse the repository at this point in the history
… and runtime abstraction (#3296)

* Added Runtime Factory to support multiple implementations

* Rename to ComponentEnsemble to ZMQRuntime

* rename zmq_runtime

* rename zmq_runtime

* pre-commit fixes

* pre-commit fix

* pre-commit fixes and default runtime

* pre-commit fixes

* Rename constants

* Rename Constants

* Create interfaces for connectors

* pre-commit fixes

* pre-commit fixes

* pre-commit fixes

* lower case file names

* rename files to lower _case

* rename files to _lowercase

* removed _

* Refactored to make Actor zmq agnostic

* fix for refactor

* fix refactor, circular dependency

* pre-commit fixes

* document classes

* pre-commit ruff

* fix ruff issues

* ruff fixes

* ruff fixes

* actor connector documentation

* better docs

---------

Co-authored-by: Li Jiang <[email protected]>
Co-authored-by: Chi Wang <[email protected]>
Co-authored-by: Ryan Sweet <[email protected]>
Co-authored-by: Eric Zhu <[email protected]>
  • Loading branch information
5 people authored Oct 22, 2024
1 parent 8a2a40d commit 1c5baf0
Show file tree
Hide file tree
Showing 38 changed files with 401 additions and 263 deletions.
1 change: 1 addition & 0 deletions samples/apps/cap/py/autogencap/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xsub_url: str = "tcp://127.0.0.1:5556"
router_url: str = "tcp://127.0.0.1:5557"
dealer_url: str = "tcp://127.0.0.1:5558"
USE_COLOR_LOGGING = True
Original file line number Diff line number Diff line change
@@ -1,57 +1,38 @@
import threading
import traceback

import zmq
from .actor_runtime import IMessageReceiver, IMsgActor, IRuntime
from .debug_log import Debug, Info

from .Config import xpub_url
from .DebugLog import Debug, Error, Info


class Actor:
class Actor(IMsgActor):
def __init__(self, agent_name: str, description: str, start_thread: bool = True):
"""Initialize the Actor with a name, description, and threading option."""
self.actor_name: str = agent_name
self.agent_description: str = description
self.run = False
self._start_event = threading.Event()
self._start_thread = start_thread
self._msg_receiver: IMessageReceiver = None
self._runtime: IRuntime = None

def on_connect(self, network):
Debug(self.actor_name, f"is connecting to {network}")
def on_connect(self):
"""Connect the actor to the runtime."""
Debug(self.actor_name, f"is connecting to {self._runtime}")
Debug(self.actor_name, "connected")

def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming text messages."""
Info(self.actor_name, f"InBox: {msg}")
return True

def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming binary messages."""
Info(self.actor_name, f"Msg: receiver=[{receiver}], msg_type=[{msg_type}]")
return True

def _msg_loop_init(self):
Debug(self.actor_name, "recv thread started")
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.RCVTIMEO, 500)
self._socket.connect(xpub_url)
str_topic = f"{self.actor_name}"
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
self._start_event.set()

def get_message(self):
try:
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
topic = topic.decode("utf-8") # Convert bytes to string
msg_type = msg_type.decode("utf-8") # Convert bytes to string
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
except zmq.Again:
return None # No message received, continue to next iteration
except Exception as e:
Error(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
return None
return topic, msg_type, sender_topic, msg

def dispatch_message(self, message):
"""Dispatch the received message based on its type."""
if message is None:
return
topic, msg_type, sender_topic, msg = message
Expand All @@ -65,40 +46,50 @@ def dispatch_message(self, message):
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
self.run = False

def get_message(self):
"""Retrieve a message from the runtime implementation."""
return self._msg_receiver.get_message()

def _msg_loop(self):
"""Main message loop for receiving and dispatching messages."""
try:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)
self._start_event.set()
while self.run:
message = self.get_message()
message = self._msg_receiver.get_message()
self.dispatch_message(message)
except Exception as e:
Debug(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
finally:
self.run = False
# In case there was an exception at startup signal
# the main thread.
self._start_event.set()
self.run = False
Debug(self.actor_name, "recv thread ended")

def on_start(self, context: zmq.Context):
self._context = context
self.run: bool = True
def on_start(self, runtime: IRuntime):
"""Start the actor and its message receiving thread if applicable."""
self._runtime = runtime # Save the runtime
self.run = True
if self._start_thread:
self._thread = threading.Thread(target=self._msg_loop)
self._thread.start()
self._start_event.wait()
else:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)

def disconnect_network(self, network):
"""Disconnect the actor from the network."""
Debug(self.actor_name, f"is disconnecting from {network}")
Debug(self.actor_name, "disconnected")
self.stop()

def stop(self):
"""Stop the actor and its message receiver."""
self.run = False
if self._start_thread:
self._thread.join()
self._socket.setsockopt(zmq.LINGER, 0)
self._socket.close()
self._msg_receiver.stop()
83 changes: 83 additions & 0 deletions samples/apps/cap/py/autogencap/actor_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, Tuple


class IActorConnector(ABC):
"""
Abstract base class for actor connectors. Each runtime will have a different implementation.
Obtain an instance of the correct connector from the runtime by calling the runtime's find_by_xyz
method.
"""

@abstractmethod
def send_txt_msg(self, msg: str) -> None:
"""
Send a text message to the actor.
Args:
msg (str): The text message to send.
"""
pass

@abstractmethod
def send_bin_msg(self, msg_type: str, msg: bytes) -> None:
"""
Send a binary message to the actor.
Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
"""
pass

@abstractmethod
def send_proto_msg(self, msg: Any) -> None:
"""
Send a protocol buffer message to the actor.
Args:
msg (Any): The protocol buffer message to send.
"""
pass

@abstractmethod
def send_recv_proto_msg(
self, msg: Any, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a protocol buffer message and receive a response from the actor.
Args:
msg (Any): The protocol buffer message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.
Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def send_recv_msg(
self, msg_type: str, msg: bytes, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a binary message and receive a response from the actor.
Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.
Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def close(self) -> None:
"""
Close the actor connector and release any resources.
"""
pass
86 changes: 79 additions & 7 deletions samples/apps/cap/py/autogencap/actor_runtime.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,108 @@
from abc import ABC, abstractmethod
from typing import List

from .Actor import Actor
from .ActorConnector import ActorConnector
from .actor_connector import IActorConnector
from .proto.CAP_pb2 import ActorInfo


class IMsgActor(ABC):
"""Abstract base class for message based actors."""

@abstractmethod
def on_connect(self, runtime: "IRuntime"):
"""Called when the actor connects to the runtime."""
pass

@abstractmethod
def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming text messages."""
pass

@abstractmethod
def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming binary messages."""
pass

@abstractmethod
def on_start(self):
"""Called when the actor starts."""
pass

@abstractmethod
def stop(self):
"""Stop the actor."""
pass

@abstractmethod
def dispatch_message(self, message):
"""Dispatch the received message based on its type."""
pass


class IMessageReceiver(ABC):
"""Abstract base class for message receivers. Implementations are runtime specific."""

@abstractmethod
def init(self, actor_name: str):
"""Initialize the message receiver."""
pass

@abstractmethod
def add_listener(self, topic: str):
"""Add a topic to the message receiver."""
pass

@abstractmethod
def get_message(self):
"""Retrieve a message from the runtime implementation."""
pass

@abstractmethod
def stop(self):
"""Stop the message receiver."""
pass


# Abstract base class for the runtime environment
class IRuntime(ABC):
"""Abstract base class for the actor runtime environment."""

@abstractmethod
def register(self, actor: IMsgActor):
"""Register an actor with the runtime."""
pass

@abstractmethod
def register(self, actor: Actor):
def get_new_msg_receiver(self) -> IMessageReceiver:
"""Create and return a new message receiver."""
pass

@abstractmethod
def connect(self):
"""Connect the runtime to the messaging system."""
pass

@abstractmethod
def disconnect(self):
"""Disconnect the runtime from the messaging system."""
pass

@abstractmethod
def find_by_topic(self, topic: str) -> ActorConnector:
def find_by_topic(self, topic: str) -> IActorConnector:
"""Find an actor connector by topic."""
pass

@abstractmethod
def find_by_name(self, name: str) -> ActorConnector:
def find_by_name(self, name: str) -> IActorConnector:
"""Find an actor connector by name."""
pass

@abstractmethod
def find_termination(self) -> ActorConnector:
def find_termination(self) -> IActorConnector:
"""Find the termination actor connector."""
pass

@abstractmethod
def find_by_name_regex(self, name_regex) -> List[ActorInfo]:
def find_by_name_regex(self, name_regex) -> List["ActorInfo"]:
"""Find actors by name using a regular expression."""
pass
13 changes: 0 additions & 13 deletions samples/apps/cap/py/autogencap/ag_adapter/AGActor.py

This file was deleted.

11 changes: 11 additions & 0 deletions samples/apps/cap/py/autogencap/ag_adapter/ag_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from autogencap.actor import Actor
from autogencap.constants import Termination_Topic
from autogencap.debug_log import Debug


class AGActor(Actor):
def on_start(self, runtime):
super().on_start(runtime)
str_topic = Termination_Topic
self._msg_receiver.add_listener(str_topic)
Debug(self.actor_name, f"subscribe to: {str_topic}")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from autogen import Agent, ConversableAgent

from ..actor_runtime import IRuntime
from .AutoGenConnector import AutoGenConnector
from .autogen_connector import AutoGenConnector


class AG2CAP(ConversableAgent):
Expand Down
4 changes: 2 additions & 2 deletions samples/apps/cap/py/autogencap/ag_adapter/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from autogen import ConversableAgent

from ..DebugLog import Info, Warn
from .CAP2AG import CAP2AG
from ..debug_log import Info, Warn
from .cap_to_ag import CAP2AG


class Agent:
Expand Down
Loading

0 comments on commit 1c5baf0

Please sign in to comment.