Skip to content

Commit

Permalink
Merge pull request #15 from fabawi/dev
Browse files Browse the repository at this point in the history
Introduced mechanism for looping through uninitialized connections without blocking others
  • Loading branch information
fabawi committed Aug 19, 2022
2 parents 1b32f25 + 78f2a7e commit 531c972
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 154 deletions.
14 changes: 9 additions & 5 deletions examples/torch_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,25 @@ class Notify(MiddlewareCommunicator):

@MiddlewareCommunicator.register("NativeObject", args.mware, "Notify", "/notify/test_native_exchange",
carrier="", should_wait=True, load_torch_device='cpu')
@MiddlewareCommunicator.register("NativeObject", args.mware, "Notify", "/notify/test_native_exchange2",
carrier="", should_wait=True, load_torch_device='cpu')
def exchange_object(self, msg):
ret = {"message": msg,
"torch_ones": torch.ones((2, 4), device='cpu'),
"torch_zeros_cuda": torch.zeros((2, 3), device='cuda')}
return ret,
str_ret = f"this is your message transmitted as a string {msg}"
return ret, str_ret

notify = Notify()

if args.mode == "publish":
notify.activate_communication(Notify.exchange_object, mode="publish")
while True:
msg_object, = notify.exchange_object(input("Type your message: "))
print("Method result:", msg_object)
msg_object, msg_str = notify.exchange_object(input("Type your message: "))
print("Method result:", msg_object, msg_str)
elif args.mode == "listen":
notify.activate_communication(Notify.exchange_object, mode="listen")
while True:
msg_object, = notify.exchange_object(None)
print("Method result:", msg_object)
msg_object, msg_str = notify.exchange_object(None)
if msg_object is not None:
print("Method result:", msg_object, msg_str)
35 changes: 20 additions & 15 deletions examples/vid_warhol.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def pil_to_cv2(img):
else:
return img

@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/darken_bg", carrier="tcp", width="$img_width", height="$img_height", rgb=True)
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/darken_bg", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False)
def darken_bg(self, img, color, img_width, img_height):
img = self.cv2_to_pil(img)
# composite image on top of a single-color image, effectively turning all transparent parts to that color
Expand All @@ -112,7 +112,7 @@ def darken_bg(self, img, color, img_width, img_height):
masked_img = self.pil_to_cv2(masked_img)
return masked_img,

@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/color_bg_fg", carrier="tcp", width="$img_width", height="$img_height", rgb=True)
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/color_bg_fg", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False)
def color_bg_fg(self, img, bg_color, fg_color, img_width, img_height):
img = self.cv2_to_pil(img)
# change transparent background to bg_color and change everything non-transparent to fg_color
Expand All @@ -122,11 +122,11 @@ def color_bg_fg(self, img, bg_color, fg_color, img_width, img_height):
masked_img = self.pil_to_cv2(masked_img)
return masked_img,

@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/white_to_color", carrier="tcp", width="$img_width", height="$img_height", rgb=True)
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/white_to_color", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False)
def white_to_color(self, img, color, img_width, img_height):
img = self.cv2_to_pil(img).convert('RGBA')
# change all colors close to white and non-transparent (alpha > 0) to be color
threshold = 50
threshold = 20
dist = 10
arr = np.array(np.asarray(img))
r, g, b, a = np.rollaxis(arr, axis=-1)
Expand All @@ -146,18 +146,22 @@ def white_to_color(self, img, color, img_width, img_height):
def make_warhol_single(self, img, bg_color, fg_color, skin_color):
# create a single warhol-serigraph-style image
bg_fg_layer, = self.color_bg_fg(img, bg_color, fg_color, img_width=img.size[0], img_height=img.size[1])
bg_fg_layer = self.cv2_to_pil(bg_fg_layer).convert('RGBA')
if bg_fg_layer is not None:
bg_fg_layer = self.cv2_to_pil(bg_fg_layer).convert('RGBA')
temp_dark_image, = self.darken_bg(img, (0, 0, 0, 255), img_width=img.size[0], img_height=img.size[1])
temp_dark_image = self.cv2_to_pil(temp_dark_image).convert('RGBA')
skin_mask, = self.white_to_color(temp_dark_image, (0, 0, 0, 0), img_width=temp_dark_image.size[0], img_height=temp_dark_image.size[1])
skin_mask = self.cv2_to_pil(skin_mask).convert('RGBA')
skin_mask = None
if temp_dark_image is not None:
temp_dark_image = self.cv2_to_pil(temp_dark_image).convert('RGBA')
skin_mask, = self.white_to_color(temp_dark_image, (0, 0, 0, 0), img_width=temp_dark_image.size[0], img_height=temp_dark_image.size[1])
if skin_mask is not None:
skin_mask = self.cv2_to_pil(skin_mask).convert('RGBA')

skin_layer = Image.new('RGBA', img.size, skin_color)
out = Image.composite(bg_fg_layer, skin_layer, skin_mask)
return out

@MiddlewareCommunicator.register([["Image", "yarp", "Warholify", "/vid_warhol/warhol_" + str(x), {"carrier": "tcp", "width": "$img_width", "height": "$img_height"}] for x in range(9)])
@MiddlewareCommunicator.register("Image", "Warholify", "/vid_warhol/warhol_combined", carrier="tcp", width="$img_width", height="$img_height")
@MiddlewareCommunicator.register([["Image", "yarp", "Warholify", "/vid_warhol/warhol_" + str(x), {"carrier": "mcast", "width": "$img_width", "height": "$img_height", "should_wait": False}] for x in range(9)])
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/warhol_combined", carrier="mcast", width="$img_width", height="$img_height", should_wait=False)
def combine_to_one(self, warhols, img_width, img_height):
warhols_new = []
for warhol in warhols:
Expand Down Expand Up @@ -195,7 +199,7 @@ def warholify_images(self, img):

return blank_img

@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/feed", carrier="tcp", width="$img_width", height="$img_height")
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/feed", carrier="mcast", width="$img_width", height="$img_height", should_wait=False)
def capture_vid(self, img_width, img_height):
if self.vid_cap is None:
self.vid_cap = cv2.VideoCapture(self.vid_src)
Expand All @@ -215,7 +219,7 @@ def transform(self, img_width=640, img_height=480):
return img


@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/final_img", carrier="tcp", width="$img_width", height="$img_height", rgb=True)
@MiddlewareCommunicator.register("Image", "yarp", "Warholify", "/vid_warhol/final_img", carrier="mcast", width="$img_width", height="$img_height", rgb=True, should_wait=False)
def display(self, img, img_width, img_height):
cv2.imshow("Warhol", img)
cv2.waitKey(1)
Expand All @@ -234,7 +238,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--cfg-file", type=str, default="vid_warhol_cfg1.yml",
parser.add_argument("--cfg-file", type=str,
help="The transmission mode configuration file")
parser.add_argument("--vid-src", default=0,
help="The video source. A string indicating a filename or an integer indicating the device id")
Expand All @@ -245,7 +249,8 @@ def parse_args():

if __name__ == "__main__":
args = parse_args()
print(args.cfg_file)
ConfigManager(args.cfg_file)
if args.cfg_file:
print(args.cfg_file)
ConfigManager(args.cfg_file)
warholify = Warholify(vid_src=args.vid_src, img_width=args.img_width, img_height=args.img_height)
warholify.run()
5 changes: 4 additions & 1 deletion wrapify/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
__version__ = 0.1
__version__ = 0.2

import logging
logging.getLogger().setLevel(logging.INFO)
5 changes: 3 additions & 2 deletions wrapify/config/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import yaml
import logging

from wrapify.utils import SingletonOptimized

Expand All @@ -14,10 +15,10 @@ def __init__(self, config):
"""
if isinstance(config, str):
self.config = self.__loadfile__(config)
print("loaded configuration", self.config)
logging.info(f"Loaded Wrapify configuration: {self.config}")
elif isinstance(config, dict):
self.config = config
print("loaded configuration", self.config)
logging.info(f"Loaded Wrapify configuration: {self.config}")
else:
self.config = []

Expand Down
28 changes: 24 additions & 4 deletions wrapify/connect/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@

# TODO (fabawi): The watch dog is not running yet. Relying on lazy listening for now
class ListenerWatchDog(metaclass=SingletonOptimized):
def __init__(self):
def __init__(self, repeats=10, inner_repeats=10):
self.repeats = repeats
self.inner_repeats = inner_repeats
self.listener_ring = []

def add_listener(self, listener):
self.listener_ring.append(listener)

def remove_listener(self, listener):
self.listener_ring.remove(listener)

def scan(self):
while self.listener_ring:
repeats = self.repeats
while self.listener_ring and (repeats > 0 | repeats <= -1):
repeats -= 1
for listener in self.listener_ring:
found_listener = listener.establish()
found_listener = listener.establish(repeats=self.inner_repeats)
if found_listener:
self.listener_ring.remove(listener)

Expand Down Expand Up @@ -46,7 +53,20 @@ def __init__(self, name, in_port, carrier="", should_wait=True, **kwargs):
self.should_wait = should_wait
self.established = False

def establish(self, **kwargs):
def check_establishment(self, established):
if established:
self.established = True
if not self.should_wait:
ListenerWatchDog().remove_listener(self)
elif not self.should_wait:
ListenerWatchDog().scan()
if self in ListenerWatchDog().listener_ring:
established = False
else:
established = True
return established

def establish(self, repeats=-1, **kwargs):
raise NotImplementedError

def listen(self):
Expand Down
33 changes: 27 additions & 6 deletions wrapify/connect/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,52 @@ def scan():

# TODO (fabawi): The watch dog is not running yet. Relying on lazy publishing for now
class PublisherWatchDog(metaclass=SingletonOptimized):
def __init__(self):
def __init__(self, repeats=10, inner_repeats=10):
self.repeats = repeats
self.inner_repeats = inner_repeats
self.publisher_ring = []

def add_publisher(self, publisher):
self.publisher_ring.append(publisher)

def remove_publisher(self, publisher):
self.publisher_ring.remove(publisher)

def scan(self):
while self.publisher_ring:
repeats = self.repeats
while self.publisher_ring and (repeats > 0 | repeats <= -1):
repeats -= 1
for publisher in self.publisher_ring:
found_listener = publisher.establish()
if found_listener:
found_publisher= publisher.establish(repeats=self.inner_repeats)
if found_publisher:
self.publisher_ring.remove(publisher)


# TODO (fabawi): Support multiple instance publishing of the same class,
# currently only an issue with the output port naming convention
class Publisher(object):
def __init__(self, name, out_port, carrier="", out_port_connect=None, **kwargs):
def __init__(self, name, out_port, carrier="", out_port_connect=None, should_wait=True, **kwargs):
self.__name__ = name
self.out_port = out_port
self.carrier = carrier
self.out_port_connect = out_port + ":out" if out_port_connect is None else out_port_connect
self.should_wait = should_wait
self.established = False

def establish(self, **kwargs):
def check_establishment(self, established):
if established:
self.established = True
if not self.should_wait:
PublisherWatchDog().remove_publisher(self)
elif not self.should_wait:
PublisherWatchDog().scan()
if self in PublisherWatchDog().publisher_ring:
established = False
else:
established = True
return established

def establish(self, repeats=-1, **kwargs):
raise NotImplementedError

def publish(self, obj):
Expand Down
Loading

0 comments on commit 531c972

Please sign in to comment.