-
Notifications
You must be signed in to change notification settings - Fork 0
/
robot_functions.py
247 lines (223 loc) · 11.6 KB
/
robot_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import os
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import ndimage
import random
import cv2
import time
import pickle
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline
from segment_anything import SamPredictor, sam_model_registry, SamAutomaticMaskGenerator
def generate_segments(image, checkpoint, model_type, min_size=None, max_size=None):
'''
Use Meta AI's Segment Anything Model to generate unique segmentations for each material in the image.
Apply a final round of Watershed segmentation to correct any artifacts of Segment Anything.
:param image: Input image to be segmented.
:param checkpoint: Segment Anything Model weights checkpoint.
:param model_type: Model type of the weights checkpoint input.
:param min_size: Minimum allowable size of output segments.
:param max_size: Maximum alloweable size of output segments.
:return: Segmented image.
'''
sam = sam_model_registry[model_type](checkpoint=checkpoint)
# sam.to(device='cuda')
predictor = SamPredictor(sam)
mask_generator = SamAutomaticMaskGenerator(sam)
masks = mask_generator.generate(image)
objects = []
if max_size: # remove large objects
for n in range(len(masks)):
if np.sum(masks[n]['segmentation']) < max_size:
objects.append(masks[n]['segmentation'])
else:
for n in range(len(masks)):
objects.append(masks[n]['segmentation'])
if min_size:
objects_cleaned = np.zeros(image.shape[:2]).astype(np.uint8)
obj_label = 1
for n in range(len(objects)): # separate any joined droplets
img = objects[n].astype(np.uint8)
# segment droplets via distance mapping and thresholding
dt = cv2.distanceTransform(img, 2, 3)
dt = ((dt - dt.min()) / (dt.max() - dt.min()) * 255).astype(np.uint8)
_, dt = cv2.threshold(dt, 0, 255, cv2.THRESH_BINARY)
# obtain the map of segmented droplets with corresponding indices
lbl, ncc = ndimage.label(dt)
unique_labels = np.unique(lbl)[1:] # exclude zero (background)
for n in range(len(unique_labels)):
labeled_objs = np.copy(lbl)
labeled_objs[lbl != unique_labels[n]] = 0
# remove small objects
if np.sum(labeled_objs != 0) > min_size:
old_comp = (objects_cleaned != 0).astype(int)
new_comp = (labeled_objs != 0).astype(int)
if np.sum(old_comp & new_comp) == 0: # ensure no overlapping objects
objects_cleaned += ((labeled_objs != 0).astype(int) * obj_label).astype(np.uint8)
obj_label += 1 # increase counter
# final cleaning loop to remove overlaps generated by ndimage.label()
unique_labels = np.unique(objects_cleaned)[1:]
for n in range(len(unique_labels)):
if np.sum(objects_cleaned == unique_labels[n]) < min_size:
objects_cleaned[objects_cleaned == unique_labels[n]] = 0 # remove
else:
objects_cleaned = objects
return objects_cleaned
def probe_contact(midpoint, rotation, probe_stroke_px):
'''
Determines the contacted pixels of the probe within the image for a given pose.
:param midpoint: Midpoint coordinates of the probe pose.
:param rotation: Pose rotation of the probe.
:param probe_stroke_px: The probe width, measured in pixels.
:return: X, Y arrays of pixels where probe contact occurs.
'''
mid_y = midpoint[:,0]
mid_x = midpoint[:,1]
# Convert rotation angle to radians
rotation_radians = np.radians(-np.array(rotation))
# Calculate half of the line segment offsets
dx = (probe_stroke_px / 2) * np.cos(rotation_radians)
dy = (probe_stroke_px / 2) * np.sin(rotation_radians)
# Calculate start and end points
start_x = mid_x - dx
start_y = mid_y - dy
end_x = mid_x + dx
end_y = mid_y + dy
return np.array([start_x, end_x]).T.round(0).astype(int), np.array([start_y, end_y]).T.round(0).astype(int)
def generate_valid_poses(droplet, num_poses, max_angle, probe_stroke_px):
'''
Generate sets of valid poses to be later optimized.
A pose is valid if it abides by the probe geometry and fits within the material's area.
:param droplet: The individual material droplet segment to generate valid poses for.
:param num_poses: Number of poses to generate within the material.
:param max_angle: Maximum allowable angle of the pose contact in the yaw-axis.
:param probe_stroke_px: The probe width, measured in pixels.
:return: X, Y arrays of pixels where probe contact occurs, midpoints, and angles of generated valid poses.
'''
# erode edges of droplet
kernel = np.ones((6, 6), np.uint8)
droplet = cv2.erode(droplet.astype(np.uint8), kernel).astype(np.float16)
probe_px_x = []
probe_px_y = []
valid_midpoints = []
valid_rotations = []
tested_poses = 0
while (len(probe_px_x) < num_poses) and (
tested_poses < num_poses * 100): # stop after num_poses of valid poses are collected or num_poses*100 poses are tested
possible_midpoints = np.array(np.nonzero(droplet)).T
midpoint_idx = random.choices(range(0, len(possible_midpoints) - 1), k=num_poses)
selected_rotations = random.choices(range(0, max_angle), k=num_poses)
selected_midpoints = possible_midpoints[midpoint_idx]
probe_x, probe_y = probe_contact(midpoint=selected_midpoints, rotation=selected_rotations,
probe_stroke_px=probe_stroke_px)
# check if probe poses are valid
for i in range(len(probe_x)):
line_y = np.arange(*probe_x[i], 1) # y-values
line_x = np.linspace(*probe_y[i], len(line_y)).round(0).astype(int) # x-values
if all(droplet[line_x, line_y] == 1): # if pose is fully within droplet
probe_px_x.append(line_x)
probe_px_y.append(line_y)
valid_midpoints.append(selected_midpoints[i])
valid_rotations.append(selected_rotations[i])
tested_poses += num_poses
return probe_px_x, probe_px_y, valid_midpoints, valid_rotations
def reward_function(droplet, poses, max_poses, max_angle, probe_stroke_px, verbose=False):
'''
Objective function used to optimize the probe poses.
Maximizes: (1) pose spatial variation and (2) pose angle uniqueness.
:param droplet: The individual material droplet segment that the poses belong to.
:param poses: Set of poses to optimize.
:param max_poses: Maximum number of poses to generate for optimization.
:param max_angle: Maximum allowable angle of the pose contact in the yaw-axis.
:param probe_stroke_px: The probe width, measured in pixels.
:param verbose: Set to True to show the individually optimized poses plotted on the material.
:return: Optimized poses, optimized poses pixel contacts, pose rewards, total area of each material.
'''
droplet = droplet.astype(np.int8) # allow signs
droplet[droplet != 0] = 1
probe_x, probe_y, midpoints, rotations = generate_valid_poses(droplet=droplet, num_poses=max_poses,
max_angle=max_angle,
probe_stroke_px=probe_stroke_px)
# construct reward function
pose_rewards_all = []
selected_poses = []
pixel_poses = []
total_pixels = 0
if len(probe_x) != 0: # check to see if droplet is too small to measure with probe size
frame_pose1 = droplet.copy()
best = np.argmin(rotations)
rotation1 = rotations[best]
total_pixels = droplet.sum()
frame_pose1[probe_x[best], probe_y[best]] = -1
selected_poses.append([*midpoints[best], rotations[best]])
pixel_poses.append([probe_x[best], probe_y[best]])
for p in range(poses - 1):
rewards = []
for n in range(len(probe_x)): # next pose
frame_pose2 = frame_pose1.copy()
frame_pose2[probe_x[n], probe_y[n]] = -1
space_obj = 1 - frame_pose2.sum() / total_pixels # most space covered => maximize
angle_obj = np.sum(np.abs(np.array(selected_poses)[:, 2] - rotations[
n]) / max_angle) # largest difference in angle => maximize
reward = (space_obj + angle_obj) / 2
rewards.append([n, reward])
rewards = np.array(rewards)
best = int(rewards[np.argmax(rewards[:, 1]), 0])
pose_rewards_all.append(rewards)
frame_pose1[probe_x[best], probe_y[best]] = -1 # update frame pose
selected_poses.append([*midpoints[best], rotations[best]])
pixel_poses.append([probe_x[best], probe_y[best]])
if verbose:
plt.imshow(frame_pose1, vmin=-1, vmax=1, cmap='bwr_r')
plt.show()
else:
pass # droplet is too small
return selected_poses, pixel_poses, np.array(pose_rewards_all), total_pixels
def euclidean_distance(a,b):
'''
Computes the Euclidean distance of each path segment between a point "a" and all other nodes "b".
:param a: The prior point in the path.
:param b: The set of all other nodes on the path with shape (n,2).
:return: The distance computation for each edge between nodes "a" and all "b".
'''
return np.sqrt((a[0] - b[:,0])**2 + (a[1]-b[:,1])**2)
def path_planning(poses, noise_level, start=[0,0], optimization_rounds=1000):
'''
Perform optimization of path plans using stochastic nearest neighbors at varying noise levels.
:param poses: Optimized poses used to generate the path branches.
:param noise_level: Median stochastic noise to test. Seven levels about this median will be tested.
:param start: The start point of the path. [0,0] is the upper left corner.
:param optimization_rounds: Number of optimization iterations.
:return: Sorted poses in their proposed path plan, total distance of the proposed plan
'''
noise_distances = []
noise_plans = []
if noise_level:
noise_levels = np.linspace(noise_level/2, noise_level*2, 7).astype(int) # 7 tested levels, centered around noise_level
else:
noise_levels = [0]
for l in range(len(noise_levels)):
optimized_distances = []
optimized_plans = []
for o in range(optimization_rounds):
poses_i = poses.copy()
planned_poses = [np.array(start)] # ordered
total_distance = []
for n in range(len(poses)):
dist = euclidean_distance(planned_poses[n], poses_i)
if noise_level:
noise = np.random.randint(-noise_levels[l], noise_levels[l], size=len(dist))
nearest = np.argmin(dist + noise)
else:
nearest = np.argmin(dist)
total_distance.append(dist[nearest])
planned_poses.append(poses_i[nearest])
poses_i = np.delete(poses_i, nearest, axis=0)
optimized_distances.append(np.sum(total_distance))
optimized_plans.append(np.array(planned_poses))
noise_distances.append(optimized_distances)
noise_plans.append(optimized_plans)
return noise_plans, noise_distances