-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera_node.py
117 lines (99 loc) · 3.88 KB
/
camera_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import pickle
import threading
import time
from typing import Optional, Tuple
import numpy as np
import zmq
from cameras.camera import CameraDriver
DEFAULT_CAMERA_PORT = 5000
class ZMQClientCamera(CameraDriver):
"""A class representing a ZMQ client for a leader robot."""
def __init__(self, port: int = DEFAULT_CAMERA_PORT, host: str = "127.0.0.1"):
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REQ)
self._socket.connect(f"tcp://{host}:{port}")
def read(
self,
img_size: Optional[Tuple[int, int]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Get the current state of the leader robot.
Returns:
T: The current state of the leader robot.
"""
# pack the image_size and send it to the server
send_message = pickle.dumps(img_size)
self._socket.send(send_message)
state_dict = pickle.loads(self._socket.recv())
return state_dict
class ZMQServerCamera:
def __init__(
self,
camera: CameraDriver,
port: int = DEFAULT_CAMERA_PORT,
host: str = "127.0.0.1",
):
self._camera = camera
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REP)
addr = f"tcp://{host}:{port}"
debug_message = f"Camera Sever Binding to {addr}, Camera: {camera}"
print(debug_message)
self._timout_message = f"Timeout in Camera Server, Camera: {camera}"
self._socket.bind(addr)
self._stop_event = threading.Event()
def serve(self) -> None:
"""Serve the leader robot state over ZMQ."""
self._socket.setsockopt(zmq.RCVTIMEO, 1000) # Set timeout to 1000 ms
while not self._stop_event.is_set():
try:
message = self._socket.recv()
img_size = pickle.loads(message)
camera_read = self._camera.read(img_size)
self._socket.send(pickle.dumps(camera_read))
except zmq.Again:
print(self._timout_message)
# Timeout occurred, check if the stop event is set
def stop(self) -> None:
"""Signal the server to stop serving."""
self._stop_event.set()
class ZMQServerCameraFaster:
def __init__(
self,
camera: CameraDriver,
port: int = DEFAULT_CAMERA_PORT,
host: str = "127.0.0.1",
):
self._camera = camera
self._context = zmq.Context()
self._socket = self._context.socket(zmq.REP)
addr = f"tcp://{host}:{port}"
debug_message = f"Camera Sever Binding to {addr}, Camera: {camera}"
print(debug_message)
self._timout_message = f"Timeout in Camera Server, Camera: {camera}"
self._socket.bind(addr)
self._stop_event = threading.Event()
self.cam_buffer = None
self.refresh_interval = 1 / 30 # Refresh every 1/30 second
self.refresh_thread = threading.Thread(target=self._refresh_buffer)
self.refresh_thread.daemon = True
self.refresh_thread.start()
def serve(self) -> None:
"""Serve the leader robot state over ZMQ."""
self._socket.setsockopt(zmq.RCVTIMEO, 1000) # Set timeout to 1000 ms
while not self._stop_event.is_set():
try:
_ = self._socket.recv()
if self.cam_buffer is not None:
self._socket.send(pickle.dumps(self.cam_buffer))
else:
self._socket.send(b"Buffer is empty.")
except zmq.Again:
print(self._timout_message)
def _refresh_buffer(self):
"""Periodically refresh the buffer."""
while not self._stop_event.is_set():
self.cam_buffer = self._camera.read()
time.sleep(self.refresh_interval)
def stop(self) -> None:
"""Signal the server to stop serving."""
self._stop_event.set()