diff --git a/examples/torch_tensor.py b/examples/torch_tensor.py index 7d8f095..769f403 100644 --- a/examples/torch_tensor.py +++ b/examples/torch_tensor.py @@ -34,21 +34,25 @@ class Notify(MiddlewareCommunicator): @MiddlewareCommunicator.register("NativeObject", args.mware, "Notify", "/notify/test_native_exchange", carrier="", should_wait=True, load_torch_device='cpu') + @MiddlewareCommunicator.register("NativeObject", args.mware, "Notify", "/notify/test_native_exchange2", + carrier="", should_wait=True, load_torch_device='cpu') def exchange_object(self, msg): ret = {"message": msg, "torch_ones": torch.ones((2, 4), device='cpu'), "torch_zeros_cuda": torch.zeros((2, 3), device='cuda')} - return ret, + str_ret = f"this is your message transmitted as a string {msg}" + return ret, str_ret notify = Notify() if args.mode == "publish": notify.activate_communication(Notify.exchange_object, mode="publish") while True: - msg_object, = notify.exchange_object(input("Type your message: ")) - print("Method result:", msg_object) + msg_object, msg_str = notify.exchange_object(input("Type your message: ")) + print("Method result:", msg_object, msg_str) elif args.mode == "listen": notify.activate_communication(Notify.exchange_object, mode="listen") while True: - msg_object, = notify.exchange_object(None) - print("Method result:", msg_object) + msg_object, msg_str = notify.exchange_object(None) + if msg_object is not None: + print("Method result:", msg_object, msg_str) diff --git a/examples/vid_warhol.py b/examples/vid_warhol.py index 23c2863..e4a9109 100644 --- a/examples/vid_warhol.py +++ b/examples/vid_warhol.py @@ -103,7 +103,7 @@ def pil_to_cv2(img): else: return img - @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/darken_bg", carrier="tcp", width="$img_width", height="$img_height", rgb=True) + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/darken_bg", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False) def darken_bg(self, img, color, img_width, img_height): img = self.cv2_to_pil(img) # composite image on top of a single-color image, effectively turning all transparent parts to that color @@ -112,7 +112,7 @@ def darken_bg(self, img, color, img_width, img_height): masked_img = self.pil_to_cv2(masked_img) return masked_img, - @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/color_bg_fg", carrier="tcp", width="$img_width", height="$img_height", rgb=True) + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/color_bg_fg", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False) def color_bg_fg(self, img, bg_color, fg_color, img_width, img_height): img = self.cv2_to_pil(img) # change transparent background to bg_color and change everything non-transparent to fg_color @@ -122,11 +122,11 @@ def color_bg_fg(self, img, bg_color, fg_color, img_width, img_height): masked_img = self.pil_to_cv2(masked_img) return masked_img, - @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/white_to_color", carrier="tcp", width="$img_width", height="$img_height", rgb=True) + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/white_to_color", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False) def white_to_color(self, img, color, img_width, img_height): img = self.cv2_to_pil(img).convert('RGBA') # change all colors close to white and non-transparent (alpha > 0) to be color - threshold = 50 + threshold = 20 dist = 10 arr = np.array(np.asarray(img)) r, g, b, a = np.rollaxis(arr, axis=-1) @@ -146,18 +146,22 @@ def white_to_color(self, img, color, img_width, img_height): def make_warhol_single(self, img, bg_color, fg_color, skin_color): # create a single warhol-serigraph-style image bg_fg_layer, = self.color_bg_fg(img, bg_color, fg_color, img_width=img.size[0], img_height=img.size[1]) - bg_fg_layer = self.cv2_to_pil(bg_fg_layer).convert('RGBA') + if bg_fg_layer is not None: + bg_fg_layer = self.cv2_to_pil(bg_fg_layer).convert('RGBA') temp_dark_image, = self.darken_bg(img, (0, 0, 0, 255), img_width=img.size[0], img_height=img.size[1]) - temp_dark_image = self.cv2_to_pil(temp_dark_image).convert('RGBA') - skin_mask, = self.white_to_color(temp_dark_image, (0, 0, 0, 0), img_width=temp_dark_image.size[0], img_height=temp_dark_image.size[1]) - skin_mask = self.cv2_to_pil(skin_mask).convert('RGBA') + skin_mask = None + if temp_dark_image is not None: + temp_dark_image = self.cv2_to_pil(temp_dark_image).convert('RGBA') + skin_mask, = self.white_to_color(temp_dark_image, (0, 0, 0, 0), img_width=temp_dark_image.size[0], img_height=temp_dark_image.size[1]) + if skin_mask is not None: + skin_mask = self.cv2_to_pil(skin_mask).convert('RGBA') skin_layer = Image.new('RGBA', img.size, skin_color) out = Image.composite(bg_fg_layer, skin_layer, skin_mask) return out - @MiddlewareCommunicator.register([["Image", "yarp", "Warholify", "/vid_warhol/warhol_" + str(x), {"carrier": "tcp", "width": "$img_width", "height": "$img_height"}] for x in range(9)]) - @MiddlewareCommunicator.register("Image", "Warholify", "/vid_warhol/warhol_combined", carrier="tcp", width="$img_width", height="$img_height") + @MiddlewareCommunicator.register([["Image", "yarp", "Warholify", "/vid_warhol/warhol_" + str(x), {"carrier": "mcast", "width": "$img_width", "height": "$img_height", "should_wait": False}] for x in range(9)]) + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/warhol_combined", carrier="mcast", width="$img_width", height="$img_height", should_wait=False) def combine_to_one(self, warhols, img_width, img_height): warhols_new = [] for warhol in warhols: @@ -195,7 +199,7 @@ def warholify_images(self, img): return blank_img - @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/feed", carrier="tcp", width="$img_width", height="$img_height") + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/feed", carrier="mcast", width="$img_width", height="$img_height", should_wait=False) def capture_vid(self, img_width, img_height): if self.vid_cap is None: self.vid_cap = cv2.VideoCapture(self.vid_src) @@ -215,7 +219,7 @@ def transform(self, img_width=640, img_height=480): return img - @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/final_img", carrier="tcp", width="$img_width", height="$img_height", rgb=True) + @MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/final_img", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False) def display(self, img, img_width, img_height): cv2.imshow("Warhol", img) cv2.waitKey(1) @@ -234,7 +238,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def parse_args(): parser = argparse.ArgumentParser() - parser.add_argument("--cfg-file", type=str, default="vid_warhol_cfg1.yml", + parser.add_argument("--cfg-file", type=str, help="The transmission mode configuration file") parser.add_argument("--vid-src", default=0, help="The video source. A string indicating a filename or an integer indicating the device id") @@ -245,7 +249,8 @@ def parse_args(): if __name__ == "__main__": args = parse_args() - print(args.cfg_file) - ConfigManager(args.cfg_file) + if args.cfg_file: + print(args.cfg_file) + ConfigManager(args.cfg_file) warholify = Warholify(vid_src=args.vid_src, img_width=args.img_width, img_height=args.img_height) warholify.run() \ No newline at end of file diff --git a/wrapify/__init__.py b/wrapify/__init__.py index b5ddc7a..6f4f0a0 100644 --- a/wrapify/__init__.py +++ b/wrapify/__init__.py @@ -1 +1,4 @@ -__version__ = 0.1 \ No newline at end of file +__version__ = 0.2 + +import logging +logging.getLogger().setLevel(logging.INFO) diff --git a/wrapify/config/manager.py b/wrapify/config/manager.py index 534f19e..c1b9e7c 100644 --- a/wrapify/config/manager.py +++ b/wrapify/config/manager.py @@ -1,4 +1,5 @@ import yaml +import logging from wrapify.utils import SingletonOptimized @@ -14,10 +15,10 @@ def __init__(self, config): """ if isinstance(config, str): self.config = self.__loadfile__(config) - print("loaded configuration", self.config) + logging.info(f"Loaded Wrapify configuration: {self.config}") elif isinstance(config, dict): self.config = config - print("loaded configuration", self.config) + logging.info(f"Loaded Wrapify configuration: {self.config}") else: self.config = [] diff --git a/wrapify/connect/listeners.py b/wrapify/connect/listeners.py index d593eed..78c7659 100644 --- a/wrapify/connect/listeners.py +++ b/wrapify/connect/listeners.py @@ -6,16 +6,23 @@ # TODO (fabawi): The watch dog is not running yet. Relying on lazy listening for now class ListenerWatchDog(metaclass=SingletonOptimized): - def __init__(self): + def __init__(self, repeats=10, inner_repeats=10): + self.repeats = repeats + self.inner_repeats = inner_repeats self.listener_ring = [] def add_listener(self, listener): self.listener_ring.append(listener) + def remove_listener(self, listener): + self.listener_ring.remove(listener) + def scan(self): - while self.listener_ring: + repeats = self.repeats + while self.listener_ring and (repeats > 0 | repeats <= -1): + repeats -= 1 for listener in self.listener_ring: - found_listener = listener.establish() + found_listener = listener.establish(repeats=self.inner_repeats) if found_listener: self.listener_ring.remove(listener) @@ -46,7 +53,20 @@ def __init__(self, name, in_port, carrier="", should_wait=True, **kwargs): self.should_wait = should_wait self.established = False - def establish(self, **kwargs): + def check_establishment(self, established): + if established: + self.established = True + if not self.should_wait: + ListenerWatchDog().remove_listener(self) + elif not self.should_wait: + ListenerWatchDog().scan() + if self in ListenerWatchDog().listener_ring: + established = False + else: + established = True + return established + + def establish(self, repeats=-1, **kwargs): raise NotImplementedError def listen(self): diff --git a/wrapify/connect/publishers.py b/wrapify/connect/publishers.py index bab881e..0f403dd 100644 --- a/wrapify/connect/publishers.py +++ b/wrapify/connect/publishers.py @@ -24,31 +24,52 @@ def scan(): # TODO (fabawi): The watch dog is not running yet. Relying on lazy publishing for now class PublisherWatchDog(metaclass=SingletonOptimized): - def __init__(self): + def __init__(self, repeats=10, inner_repeats=10): + self.repeats = repeats + self.inner_repeats = inner_repeats self.publisher_ring = [] def add_publisher(self, publisher): self.publisher_ring.append(publisher) + def remove_publisher(self, publisher): + self.publisher_ring.remove(publisher) + def scan(self): - while self.publisher_ring: + repeats = self.repeats + while self.publisher_ring and (repeats > 0 | repeats <= -1): + repeats -= 1 for publisher in self.publisher_ring: - found_listener = publisher.establish() - if found_listener: + found_publisher= publisher.establish(repeats=self.inner_repeats) + if found_publisher: self.publisher_ring.remove(publisher) # TODO (fabawi): Support multiple instance publishing of the same class, # currently only an issue with the output port naming convention class Publisher(object): - def __init__(self, name, out_port, carrier="", out_port_connect=None, **kwargs): + def __init__(self, name, out_port, carrier="", out_port_connect=None, should_wait=True, **kwargs): self.__name__ = name self.out_port = out_port self.carrier = carrier self.out_port_connect = out_port + ":out" if out_port_connect is None else out_port_connect + self.should_wait = should_wait self.established = False - def establish(self, **kwargs): + def check_establishment(self, established): + if established: + self.established = True + if not self.should_wait: + PublisherWatchDog().remove_publisher(self) + elif not self.should_wait: + PublisherWatchDog().scan() + if self in PublisherWatchDog().publisher_ring: + established = False + else: + established = True + return established + + def establish(self, repeats=-1, **kwargs): raise NotImplementedError def publish(self, obj): diff --git a/wrapify/connect/wrapper.py b/wrapify/connect/wrapper.py index 7352181..12f5e58 100644 --- a/wrapify/connect/wrapper.py +++ b/wrapify/connect/wrapper.py @@ -19,23 +19,19 @@ def __init__(self): self.activate_communication(getattr(self.__class__, key), mode=value) @classmethod - def register(cls, data_type, middleware, *args, **kwargs): + def register(cls, data_type, middleware=DEFAULT_COMMUNICATOR, *args, **kwargs): def encapsulate(func): # define the communication message type (single element) if isinstance(data_type, str): return_func_type = data_type + ":" + middleware - return_func_listen = lsn.Listeners.registry[return_func_type] - return_func_publish = pub.Publishers.registry[return_func_type] return_func_args = args return_func_kwargs = kwargs # define the communication message type (list for single return). NOTE: supports 1 layer depth only elif isinstance(data_type, list): - return_func_listen, return_func_publish, return_func_args, return_func_kwargs, return_func_type = [], [], [], [], [] + return_func_args, return_func_kwargs, return_func_type = [], [], [] for arg in data_type: data_spec = arg[0] + ":" + arg[1] - return_func_listen.append(lsn.Listeners.registry[data_spec]) - return_func_publish.append(pub.Publishers.registry[data_spec]) return_func_args.append([a for a in arg[2:] if not isinstance(a, dict)]) return_func_kwargs.append(*[a for a in arg[2:] if isinstance(a, dict)]) return_func_type.append(data_spec) @@ -50,15 +46,11 @@ def encapsulate(func): func_qualname = func.__qualname__ if func_qualname in cls.__registry: cls.__registry[func_qualname]["communicator"].append({ - "return_func_listen": return_func_listen, - "return_func_publish": return_func_publish, "return_func_args": return_func_args, "return_func_kwargs": return_func_kwargs, "return_func_type": return_func_type}) else: cls.__registry[func_qualname] = {"communicator": [{ - "return_func_listen": return_func_listen, - "return_func_publish": return_func_publish, "return_func_args": return_func_args, "return_func_kwargs": return_func_kwargs, "return_func_type": return_func_type}]} @@ -66,6 +58,9 @@ def encapsulate(func): @wraps(func) def wrapper(*wds, **kwds): # Triggers on calling the function + # execute the function as usual + if cls._MiddlewareCommunicator__registry[func.__qualname__]["mode"] is None: + return func(*wds, **kwds) instance_address = hex(id(wds[0])) instance_id = cls._MiddlewareCommunicator__registry[func.__qualname__]["__WRAPIFY_INSTANCES"].index(instance_address) + 1 @@ -76,12 +71,8 @@ def wrapper(*wds, **kwds): # Triggers on calling the function cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["args"] = wds cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["kwargs"] = kwd - # execute the function as usual - if cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["mode"] is None: - return func(*wds, **kwds) - # publishes the functions returns - elif cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["mode"] == "publish": + if cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["mode"] == "publish": if "wrapped_executor" not in cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["communicator"][0]: # instantiate the publishers cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["communicator"].reverse() @@ -90,14 +81,15 @@ def wrapper(*wds, **kwds): # Triggers on calling the function if isinstance(communicator["return_func_type"], str): new_args, new_kwargs = match_args( communicator["return_func_args"], communicator["return_func_kwargs"], wds[1:], kwd) - communicator["wrapped_executor"] = communicator["return_func_publish"](*new_args, **new_kwargs) + + communicator["wrapped_executor"] = pub.Publishers.registry[communicator["return_func_type"]](*new_args, **new_kwargs) # list for single return elif isinstance(communicator["return_func_type"], list): communicator["wrapped_executor"] = [] for comm_idx in range(len(communicator["return_func_type"])): new_args, new_kwargs = match_args( communicator["return_func_args"][comm_idx], communicator["return_func_kwargs"][comm_idx], wds[1:], kwd) - communicator["wrapped_executor"].append(communicator["return_func_publish"][comm_idx](*new_args, **new_kwargs)) + communicator["wrapped_executor"].append(pub.Publishers.registry[communicator["return_func_type"][comm_idx]](*new_args, **new_kwargs)) returns = func(*wds, **kwds) for ret_idx, ret in enumerate(returns): wrp_exec = cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["communicator"][ret_idx]["wrapped_executor"] @@ -119,13 +111,13 @@ def wrapper(*wds, **kwds): # Triggers on calling the function # single element if isinstance(communicator["return_func_type"], str): new_args, new_kwargs = match_args(communicator["return_func_args"], communicator["return_func_kwargs"], wds[1:], kwd) - communicator["wrapped_executor"] = communicator["return_func_listen"](*new_args, **new_kwargs) + communicator["wrapped_executor"] = lsn.Listeners.registry[communicator["return_func_type"]](*new_args, **new_kwargs) # list for single return elif isinstance(communicator["return_func_type"], list): communicator["wrapped_executor"] = [] for comm_idx in range(len(communicator["return_func_type"])): new_args, new_kwargs = match_args(communicator["return_func_args"][comm_idx], communicator["return_func_kwargs"][comm_idx], wds[1:], kwd) - communicator["wrapped_executor"].append(communicator["return_func_listen"][comm_idx](*new_args, **new_kwargs)) + communicator["wrapped_executor"].append(lsn.Listeners.registry[communicator["return_func_type"][comm_idx]](*new_args, **new_kwargs)) cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["last_results"] = [] for ret_idx in range(len(cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["communicator"])): wrp_exec = cls._MiddlewareCommunicator__registry[func.__qualname__ + instance_id]["communicator"][ret_idx]["wrapped_executor"] @@ -152,6 +144,8 @@ def wrapper(*wds, **kwds): # Triggers on calling the function return encapsulate def activate_communication(self, func, mode): + if isinstance(func, str): + func = getattr(self, func) entry = self.__registry.get(func.__qualname__, None) if entry is not None: instance_addr = hex(id(self)) diff --git a/wrapify/listeners/ros.py b/wrapify/listeners/ros.py index 007abbc..5c69024 100644 --- a/wrapify/listeners/ros.py +++ b/wrapify/listeners/ros.py @@ -1,5 +1,7 @@ import json +import logging import queue + import numpy as np import rospy import std_msgs.msg @@ -45,7 +47,7 @@ def _message_callback(self, data): try: self._queue.put(data.data, block=False) except queue.Full: - print(f"Discarding data because listener queue is full: {self.in_port}") + logging.warning(f"Discarding data because listener queue is full: {self.in_port}") @Listeners.register("Image", "ros") @@ -92,7 +94,7 @@ def _message_callback(self, data): try: self._queue.put((data.height, data.width, data.encoding, data.is_bigendian, data.data), block=False) except queue.Full: - print(f"Discarding data because listener queue is full: {self.in_port}") + logging.warning(f"Discarding data because listener queue is full: {self.in_port}") @Listeners.register("AudioChunk", "ros") @@ -129,7 +131,7 @@ def _message_callback(self, data): try: self._queue.put((data.height, data.width, data.encoding, data.is_bigendian, data.data), block=False) except queue.Full: - print(f"Discarding data because listener queue is full: {self.in_port}") + logging.warning(f"Discarding data because listener queue is full: {self.in_port}") @Listeners.register("Properties", "ros") diff --git a/wrapify/listeners/yarp.py b/wrapify/listeners/yarp.py index e3c685b..e99c6eb 100644 --- a/wrapify/listeners/yarp.py +++ b/wrapify/listeners/yarp.py @@ -1,5 +1,7 @@ +import logging import json import time + import numpy as np import yarp @@ -10,17 +12,29 @@ class YarpListener(Listener): - def __init__(self, name, in_port, carrier="", should_wait=True, **kwargs): - super().__init__(name, in_port, carrier=carrier, should_wait=should_wait, **kwargs) + def __init__(self, name, in_port, carrier="", **kwargs): + super().__init__(name, in_port, carrier=carrier, **kwargs) YarpMiddleware.activate() - def await_connection(self, port=None): + def await_connection(self, port=None, repeats=None): + connected = False if port is None: port = self.in_port - print("Waiting for input port:", port) - while not yarp.Network.exists(port): - time.sleep(0.2) - print("Connected to input port:", port) + logging.info(f"Waiting for input port: {port}") + if repeats is None: + if self.should_wait: + repeats = -1 + else: + repeats = 1 + + while repeats > 0 or repeats <= -1: + repeats -= 1 + connected = yarp.Network.exists(port) + if connected: + logging.info(f"Connected to input port: {port}") + break + time.sleep(0.2) + return connected def read_port(self, port): while True: @@ -34,23 +48,27 @@ def read_port(self, port): @Listeners.register("NativeObject", "yarp") class YarpNativeObjectListener(YarpListener): - def __init__(self, name, in_port, carrier="", should_wait=True, load_torch_device=None, **kwargs): - super().__init__(name, in_port, carrier=carrier, should_wait=should_wait, **kwargs) + def __init__(self, name, in_port, carrier="", load_torch_device=None, **kwargs): + super().__init__(name, in_port, carrier=carrier, **kwargs) self._json_object_hook = JsonDecodeHook(torch_device=load_torch_device).object_hook self._port = self._netconnect = None - ListenerWatchDog().add_listener(self) - - def establish(self): - self.await_connection() - self._port = yarp.BufferedPortBottle() - rnd_id = str(np.random.randint(100000, size=1)[0]) - self._port.open(self.in_port + ":in" + rnd_id) - self._netconnect = yarp.Network.connect(self.in_port, self.in_port + ":in" + rnd_id, self.carrier) - self.established = True + if not self.should_wait: + ListenerWatchDog().add_listener(self) + + def establish(self, repeats=None, **kwargs): + established = self.await_connection(repeats=repeats) + if established: + self._port = yarp.BufferedPortBottle() + rnd_id = str(np.random.randint(100000, size=1)[0]) + self._port.open(self.in_port + ":in" + rnd_id) + self._netconnect = yarp.Network.connect(self.in_port, self.in_port + ":in" + rnd_id, self.carrier) + return self.check_establishment(established) def listen(self): if not self.established: - self.establish() + established = self.establish() + if not established: + return None obj = self.read_port(self._port) return json.loads(obj.get(0).asString(), object_hook=self._json_object_hook) if obj is not None else None @@ -58,30 +76,34 @@ def listen(self): @Listeners.register("Image", "yarp") class YarpImageListener(YarpListener): - def __init__(self, name, in_port, carrier="", should_wait=True, width=-1, height=-1, rgb=True, fp=False, **kwargs): - super().__init__(name, in_port, carrier=carrier, should_wait=should_wait, **kwargs) + def __init__(self, name, in_port, carrier="", width=-1, height=-1, rgb=True, fp=False, **kwargs): + super().__init__(name, in_port, carrier=carrier, **kwargs) self.width = width self.height = height self.rgb = rgb self.fp = fp self._port = self._type = self._netconnect = None - ListenerWatchDog().add_listener(self) - - def establish(self): - self.await_connection() - if self.rgb: - self._port = yarp.BufferedPortImageRgbFloat() if self.fp else yarp.BufferedPortImageRgb() - else: - self._port = yarp.BufferedPortImageFloat() if self.fp else yarp.BufferedPortImageMono() - self._type = np.float32 if self.fp else np.uint8 - in_port_connect = f"{self.in_port}:in{np.random.randint(100000, size=1).item()}" - self._port.open(in_port_connect) - self._netconnect = yarp.Network.connect(self.in_port, in_port_connect, self.carrier) - self.established = True + if not self.should_wait: + ListenerWatchDog().add_listener(self) + + def establish(self, repeats=None, **kwargs): + established = self.await_connection(repeats=repeats) + if established: + if self.rgb: + self._port = yarp.BufferedPortImageRgbFloat() if self.fp else yarp.BufferedPortImageRgb() + else: + self._port = yarp.BufferedPortImageFloat() if self.fp else yarp.BufferedPortImageMono() + self._type = np.float32 if self.fp else np.uint8 + in_port_connect = f"{self.in_port}:in{np.random.randint(100000, size=1).item()}" + self._port.open(in_port_connect) + self._netconnect = yarp.Network.connect(self.in_port, in_port_connect, self.carrier) + return self.check_establishment(established) def listen(self): if not self.established: - self.establish() + established = self.establish() + if not established: + return None yarp_img = self.read_port(self._port) if yarp_img is None: return None @@ -109,27 +131,32 @@ def __del__(self): @Listeners.register("AudioChunk", "yarp") class YarpAudioChunkListener(YarpImageListener): - def __init__(self, name, in_port, carrier="", should_wait=True, channels=1, rate=44100, chunk=-1, **kwargs): - super().__init__(name, in_port, carrier=carrier, should_wait=should_wait, width=chunk, height=channels, rgb=False, fp=True, **kwargs) + def __init__(self, name, in_port, carrier="", channels=1, rate=44100, chunk=-1, **kwargs): + super().__init__(name, in_port, carrier=carrier, width=chunk, height=channels, rgb=False, fp=True, **kwargs) self.channels = channels self.rate = rate self.chunk = chunk self._dummy_sound = self._dummy_port = self._dummy_netconnect = None - ListenerWatchDog().add_listener(self) - - def establish(self): - self.await_connection(port=self.in_port + "_SND") - # create a dummy sound object for transmitting the sound props. This could be cleaner but left for future impl. - rnd_id = str(np.random.randint(100000, size=1)[0]) - self._dummy_port = yarp.Port() - self._dummy_port.open(self.in_port + "_SND:in" + rnd_id) - self._dummy_netconnect = yarp.Network.connect(self.in_port + "_SND", self.in_port + "_SND:in" + rnd_id, self.carrier) - super(YarpAudioChunkListener, self).establish() - self._dummy_sound = yarp.Sound() - self._dummy_port.read(self._dummy_sound) - self.rate = self._dummy_sound.getFrequency() - self.width = self.chunk = self._dummy_sound.getSamples() - self.height = self.channels = self._dummy_sound.getChannels() + if not self.should_wait: + ListenerWatchDog().add_listener(self) + + def establish(self, repeats=None, **kwargs): + established = self.await_connection(port=self.in_port + "_SND", repeats=repeats) + if established: + # create a dummy sound object for transmitting the sound props. This could be cleaner but left for future impl. + rnd_id = str(np.random.randint(100000, size=1)[0]) + self._dummy_port = yarp.Port() + self._dummy_port.open(self.in_port + "_SND:in" + rnd_id) + self._dummy_netconnect = yarp.Network.connect(self.in_port + "_SND", self.in_port + "_SND:in" + rnd_id, self.carrier) + established = self.check_establishment(established) + established_parent = super(YarpAudioChunkListener, self).establish(repeats=repeats) + if established_parent: + self._dummy_sound = yarp.Sound() + # self._dummy_port.read(self._dummy_sound) + # self.rate = self._dummy_sound.getFrequency() + # self.width = self.chunk = self._dummy_sound.getSamples() + # self.height = self.channels = self._dummy_sound.getChannels() + return established def listen(self): return super().listen(), self.rate @@ -142,7 +169,6 @@ def close(self): @Listeners.register("Properties", "yarp") class YarpPropertiesListener(YarpListener): - def __init__(self, name, in_port, **kwargs): super().__init__(name, in_port, **kwargs) raise NotImplementedError diff --git a/wrapify/middlewares/ros.py b/wrapify/middlewares/ros.py index 376ca65..54cec63 100644 --- a/wrapify/middlewares/ros.py +++ b/wrapify/middlewares/ros.py @@ -1,4 +1,6 @@ +import logging import atexit + import rospy from wrapify.utils import SingletonOptimized @@ -11,11 +13,11 @@ def activate(): ROSMiddleware() def __init__(self): - print("Initialising ROS middleware") + logging.info("Initialising ROS middleware") rospy.init_node('wrapify', anonymous=True, disable_signals=True) atexit.register(self.deinit) @staticmethod def deinit(): - print("Deinitialising ROS middleware") + logging.info("Deinitialising ROS middleware") rospy.signal_shutdown('Deinit') diff --git a/wrapify/middlewares/yarp.py b/wrapify/middlewares/yarp.py index 0e38324..ffd575d 100644 --- a/wrapify/middlewares/yarp.py +++ b/wrapify/middlewares/yarp.py @@ -1,4 +1,6 @@ +import logging import atexit + import yarp from wrapify.utils import SingletonOptimized @@ -11,11 +13,11 @@ def activate(): YarpMiddleware() def __init__(self): - print("Initialising YARP middleware") + logging.info("Initialising YARP middleware") yarp.Network.init() atexit.register(self.deinit) @staticmethod def deinit(): - print("Deinitialising YARP middleware") + logging.info("Deinitialising YARP middleware") yarp.Network.fini() diff --git a/wrapify/publishers/ros.py b/wrapify/publishers/ros.py index 20c252c..6a69436 100644 --- a/wrapify/publishers/ros.py +++ b/wrapify/publishers/ros.py @@ -1,6 +1,8 @@ +import logging import sys import json import time + import numpy as np import rospy import std_msgs.msg @@ -18,13 +20,24 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, queue_size ROSMiddleware.activate() self.queue_size = queue_size - def await_connection(self, publisher, out_port=None): + def await_connection(self, publisher, out_port=None, repeats=None): + connected = False if out_port is None: out_port = self.out_port - print("Waiting for topic subscriber:", out_port) - while publisher.get_num_connections() < 1: - time.sleep(0.02) - print("Topic subscriber connected:", out_port) + logging.info(f"Waiting for topic subscriber: {out_port}") + if repeats is None: + if self.should_wait: + repeats = -1 + else: + repeats = 1 + while repeats > 0 or repeats <= -1: + repeats -= 1 + connected = publisher.get_num_connections() < 1 + if connected: + break + time.sleep(0.02) + logging.info(f"Topic subscriber connected: {out_port}") + return connected @Publishers.register("NativeObject", "ros") @@ -33,16 +46,21 @@ class ROSNativeObjectPublisher(ROSPublisher): def __init__(self, name, out_port, carrier="", out_port_connect=None, queue_size=5, **kwargs): super().__init__(name, out_port, carrier=carrier, out_port_connect=out_port_connect, queue_size=queue_size, **kwargs) self._publisher = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self): + def establish(self, repeats=None, **kwargs): self._publisher = rospy.Publisher(self.out_port, std_msgs.msg.String, queue_size=self.queue_size) - self.await_connection(self._publisher) - self.established = True + established = self.await_connection(self._publisher, repeats=repeats) + return self.check_establishment(established) def publish(self, obj): if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) obj_str = json.dumps(obj, cls=JsonEncoder) self._publisher.publish(obj_str) @@ -63,16 +81,21 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, width=-1, self._encoding = 'bgr8' if self.rgb else 'mono8' self._type = np.uint8 self._publisher = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self): + def establish(self, repeats=None, **kwargs): self._publisher = rospy.Publisher(self.out_port, sensor_msgs.msg.Image, queue_size=self.queue_size) - self.await_connection(self._publisher) - self.established = True + established = self.await_connection(self._publisher) + return self.check_establishment(established) def publish(self, img): if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) if 0 < self.width != img.shape[1] or 0 < self.height != img.shape[0] or not ((img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3)): raise ValueError("Incorrect image shape for publisher") img = np.require(img, dtype=self._type, requirements='C') @@ -96,16 +119,21 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, channels=1 self.rate = rate self.chunk = chunk self._publisher = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self, **kwargs): + def establish(self, repeats=None, **kwargs): self._publisher = rospy.Publisher(self.out_port, sensor_msgs.msg.Image, queue_size=self.queue_size) - self.await_connection(self._publisher) - self.established = True + established = self.await_connection(self._publisher) + return self.check_establishment(established) def publish(self, aud): if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) aud, rate = aud if rate != self.rate: raise ValueError("Incorrect audio rate for publisher") @@ -127,7 +155,6 @@ def publish(self, aud): @Publishers.register("Properties", "ros") class ROSPropertiesPublisher(ROSPublisher): - def __init__(self, name, out_port, **kwargs): super().__init__(name, out_port, **kwargs) raise NotImplementedError diff --git a/wrapify/publishers/yarp.py b/wrapify/publishers/yarp.py index d4abd09..6a2d39f 100644 --- a/wrapify/publishers/yarp.py +++ b/wrapify/publishers/yarp.py @@ -1,5 +1,7 @@ +import logging import json import time + import numpy as np import yarp @@ -14,13 +16,24 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, **kwargs): super().__init__(name, out_port, carrier=carrier, out_port_connect=out_port_connect, **kwargs) YarpMiddleware.activate() - def await_connection(self, port, out_port=None): + def await_connection(self, port, out_port=None, repeats=None): + connected = False if out_port is None: out_port = self.out_port - print("Waiting for output connection:", out_port) - while port.getOutputCount() < 1: - time.sleep(0.02) - print("Output connection established:", out_port) + logging.info(f"Waiting for output connection: {out_port}") + if repeats is None: + if self.should_wait: + repeats = -1 + else: + repeats = 1 + while repeats > 0 or repeats <= -1: + repeats -= 1 + connected = port.getOutputCount() < 1 + if connected: + break + time.sleep(0.02) + logging.info(f"Output connection established: {out_port}") + return connected @Publishers.register("NativeObject", "yarp") @@ -39,18 +52,23 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, **kwargs): """ super().__init__(name, out_port, carrier=carrier, out_port_connect=out_port_connect, **kwargs) self._port = self._netconnect = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self): + def establish(self, repeats=None, **kwargs): self._port = yarp.BufferedPortBottle() self._port.open(self.out_port) self._netconnect = yarp.Network.connect(self.out_port, self.out_port_connect, self.carrier) - self.await_connection(self._port) - self.established = True + established = self.await_connection(self._port, repeats=repeats) + return self.check_establishment(established) def publish(self, obj): if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) obj = json.dumps(obj, cls=JsonEncoder) oobj = self._port.prepare() oobj.clear() @@ -79,9 +97,10 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, width=-1, self.rgb = rgb self.fp = fp self._port = self._type = self._netconnect = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self): + def establish(self, repeats=None, **kwargs): if self.rgb: self._port = yarp.BufferedPortImageRgbFloat() if self.fp else yarp.BufferedPortImageRgb() else: @@ -89,8 +108,8 @@ def establish(self): self._type = np.float32 if self.fp else np.uint8 self._port.open(self.out_port) self._netconnect = yarp.Network.connect(self.out_port, self.out_port_connect, self.carrier) - self.await_connection(self._port) - self.established = True + established = self.await_connection(self._port, repeats=repeats) + return self.check_establishment(established) def publish(self, img): """ @@ -99,7 +118,11 @@ def publish(self, img): :return: None """ if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) if 0 < self.width != img.shape[1] or 0 < self.height != img.shape[0] or not ((img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3)): raise ValueError("Incorrect image shape for publisher") img = np.require(img, dtype=self._type, requirements='C') @@ -142,9 +165,10 @@ def __init__(self, name, out_port, carrier="", out_port_connect=None, channels=1 self.rate = rate self.chunk = chunk self._dummy_sound = self._dummy_port = self._dummy_netconnect = None - PublisherWatchDog().add_publisher(self) + if not self.should_wait: + PublisherWatchDog().add_publisher(self) - def establish(self): + def establish(self, repeats=None, **kwargs): # create a dummy sound object for transmitting the sound props. This could be cleaner but left for future impl. self._dummy_port = yarp.Port() self._dummy_port.open(self.out_port + "_SND") @@ -152,9 +176,11 @@ def establish(self): self._dummy_sound = yarp.Sound() self._dummy_sound.setFrequency(self.rate) self._dummy_sound.resize(self.chunk, self.channels) - self.await_connection(self._dummy_port, out_port=self.out_port + "_SND") - super(YarpAudioChunkPublisher, self).establish() - self._dummy_port.write(self._dummy_sound) + established = self.await_connection(self._dummy_port, out_port=self.out_port + "_SND") + if established: + super(YarpAudioChunkPublisher, self).establish(repeats=repeats) + self._dummy_port.write(self._dummy_sound) + return self.check_establishment(established) def publish(self, aud): """ @@ -163,7 +189,11 @@ def publish(self, aud): :return: None """ if not self.established: - self.establish() + established = self.establish() + if not established: + return + else: + time.sleep(0.2) aud, _ = aud if aud is not None: oaud = self._port.prepare()