Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start_camera, stop_camera services for compatibility with power-saver #11

Merged
merged 7 commits into from
Oct 2, 2024
1 change: 1 addition & 0 deletions turtlebot4_vision_tutorials/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
(os.path.join('share', package_name, 'models'), glob('models/*.blob')),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
(os.path.join('share', package_name, 'templates'), glob('templates/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ def pd_postprocess(inference, crop_region):
xnorm.append(xn)
ynorm.append(yn)
scores.append(inference[3*i+2])
x.append(int(xmin + xn * size))
y.append(int(ymin + yn * size))
x.append(int(xmin + xn * size))
y.append(int(ymin + yn * size))

next_crop_region = determine_crop_region(scores, x, y) if ${_smart_crop} else init_crop_region
return x, y, xnorm, ynorm, scores, next_crop_region

node.warn("Processing node started")
# Defines the default crop region (pads the full image from both sides to make it a square image)
# Defines the default crop region (pads the full image from both sides to make it a square image)
# Used when the algorithm cannot reliably determine the crop region from the previous frame.
init_crop_region = ${_init_crop_region}
crop_region = init_crop_region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

from turtlebot4_vision_tutorials.FPS import FPS

SCRIPT_DIR = Path(__file__).resolve().parent
MOVENET_LIGHTNING_MODEL = os.path.join(SCRIPT_DIR.parent,
from ament_index_python.packages import get_package_share_directory

SHARE_DIR = get_package_share_directory('turtlebot4_vision_tutorials')
MOVENET_LIGHTNING_MODEL = os.path.join(SHARE_DIR,
"models",
"movenet_singlepose_lightning_U8_transpose.blob")
MOVENET_THUNDER_MODEL = os.path.join(SCRIPT_DIR.parent,
MOVENET_THUNDER_MODEL = os.path.join(SHARE_DIR,
"models",
"movenet_singlepose_thunder_U8_transpose.blob")
TEMPLATE_DIR = os.path.join(SHARE_DIR, 'templates')

# Dictionary that maps from joint names to keypoint indices.
KEYPOINT_DICT = {
Expand Down Expand Up @@ -308,7 +311,7 @@ def build_processing_script(self):
which is a python template
'''
# Read the template
with open(os.path.join(SCRIPT_DIR, 'template_processing_script.py'), 'r') as file:
with open(os.path.join(TEMPLATE_DIR, 'template_processing_script.py'), 'r') as file:
template = Template(file.read())

# Perform the substitution
Expand Down Expand Up @@ -346,7 +349,6 @@ def pd_postprocess(self, inference):
return body

def next_frame(self):

self.fps.update()

# Get the device camera frame if wanted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,28 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String as string_msg
from std_srvs.srv import Trigger
from sensor_msgs.msg import Image

from irobot_create_msgs.msg import LightringLeds
from turtlebot4_vision_tutorials.MovenetDepthaiEdge import MovenetDepthai

POSE_DETECT_ENABLE = True

# keys:
# (right, left) where each value is the angle with the horiziontal
# quantized into 8 regions
# Human facing camera
#
# R 0 0 L
# 1 7
# O
# 2 +--+--+ 6
# | \|/ |
# 3 | +++ | 5
# | |
# 4 | | 4
# -+ +-
SEMAPHORE_FLAG = {
(4, 7): 'E', (4, 6): 'F', (4, 5): 'G', (2, 3): 'H',
(3, 4): 'A', (2, 4): 'B', (1, 4): 'C', (0, 4): 'D',
Expand All @@ -39,8 +54,43 @@
(2, 7): 'Q', (2, 6): 'R', (2, 5): 'S', (1, 0): 'T',
(1, 7): 'U', (0, 5): 'V', (7, 6): 'W', (7, 5): 'X',
(1, 6): 'Y', (5, 6): 'Z',

# Extended semaphore signals
(0, 7): '#', (4, 4): ' ', (1, 5): 'DEL',

# Not official semaphore, but allow the "touchdown" referee signal
# with both arms held vertically above the head
(0, 0): 'TD',
}

# Semaphore letters that indicate to drive left
LEFT_LETTERS = [
'B',
'C',
'H',
'I',
'O',
'S',
]

# Semaphore letters that indicate to drive right
RIGHT_LETTERS = [
'E',
'F',
'M',
'W',
'X',
'Z',
]

# Semaphore letters that indicate to drive forward
FORWARD_LETTERS = [
'T',
'U',
'#',
'TD',
]

KEYPOINT_DICT = {
'nose': 0,
'left_eye': 1,
Expand Down Expand Up @@ -113,6 +163,20 @@ def __init__(self):
10,
)

self.is_paused_ = False

self.start_camera_srv = self.create_service(
Trigger,
'oakd/start_camera',
self.handle_start_camera
)

self.stop_camera_srv = self.create_service(
Trigger,
'oakd/stop_camera',
self.handle_stop_camera
)

timer_period = 0.0833 # seconds
self.timer = self.create_timer(timer_period, self.pose_detect)

Expand All @@ -133,6 +197,9 @@ def __init__(self):
# self.button_1_function()

def pose_detect(self):
if self.is_paused_:
return

if not (POSE_DETECT_ENABLE):
return
# Run movenet on next frame
Expand Down Expand Up @@ -168,7 +235,7 @@ def pose_detect(self):
# Stamp the message with the current time
flag_msg.data = letter
self.semaphore_flag_publisher.publish(flag_msg)
self.get_logger().info('Letter detected is: ' + letter)
self.get_logger().info(f'Letter detected is: {letter}')

self.autonomous_lights()

Expand Down Expand Up @@ -197,22 +264,24 @@ def angle_with_y(v):
right_pose = int((right_arm_angle + 202.5) / 45) % 8
left_pose = int((left_arm_angle + 202.5) / 45) % 8
letter = SEMAPHORE_FLAG.get((right_pose, left_pose), None)

dir_temp = Dir.STOP
if right_pose == 2 and (left_pose > 3 and left_pose < 6):
if letter in LEFT_LETTERS:
dir_temp = Dir.LEFT
self.lights_blue_ = True
elif left_pose == 6 and (right_pose > 2 and right_pose < 5):
elif letter in RIGHT_LETTERS:
dir_temp = Dir.RIGHT
elif (left_pose > 6 or left_pose == 0) and right_pose < 2:
elif letter in FORWARD_LETTERS:
dir_temp = Dir.STRAIGHT
if dir_temp != Dir.STOP:
self.lights_blue_ = True

if dir_temp == self.dir:
self.dir_confirm += 1
else:
self.dir_confirm = 1
self.dir = dir_temp
if self.dir_confirm > 4:

self.lights_blue_ = self.dir != Dir.STOP

if self.dir_confirm >= 3:
cmd_vel_msg = Twist()
if self.dir == Dir.LEFT:
cmd_vel_msg.angular.z = 0.4
Expand Down Expand Up @@ -278,6 +347,32 @@ def autonomous_lights(self):
# Publish the message
self.lightring_publisher.publish(lightring_msg)

def handle_start_camera(self, req, resp):
if self.is_paused_:
self.is_paused_ = False
self.lights_blue_ = False
self.pose = MovenetDepthai(input_src='rgb',
model='thunder',
score_thresh=0.3,
crop=not 'store_true',
smart_crop=not 'store_true',
internal_frame_height=432)
resp.success = True
else:
resp.message = 'Device already running'
return resp

def handle_stop_camera(self, req, resp):
if not self.is_paused_:
self.is_paused_ = True
self.lights_blue_ = False
self.pose.device.close()
self.pose = None
resp.success = True
else:
resp.message = 'Device already stopped'
return resp


def main(args=None):
rclpy.init(args=args)
Expand Down
Loading