Skip to content

Commit

Permalink
fix airo-camera-toolkit mypy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Victorlouisdg committed Jul 30, 2024
1 parent 8c12d87 commit 4f41a90
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def __init__(
self.rgb_shm_array: np.ndarray = np.ndarray(rgb_shape, dtype=np.uint8, buffer=self.rgb_shm.buf)

# Preallocate the buffer array to avoid reallocation at each retrieve.
self.rgb_buffer_array = np.ndarray(rgb_shape, dtype=np.uint8)
self.rgb_buffer_array: np.ndarray = np.ndarray(rgb_shape, dtype=np.uint8)

self.previous_timestamp = time.time()

Expand Down Expand Up @@ -379,31 +379,31 @@ def _close_shared_memory(self) -> None:

if self.rgb_shm is not None:
self.rgb_shm.close()
self.rgb_shm = None
self.rgb_shm = None # type: ignore

if self.rgb_shape_shm is not None:
self.rgb_shape_shm.close()
self.rgb_shape_shm = None
self.rgb_shape_shm = None # type: ignore

if self.timestamp_shm is not None:
self.timestamp_shm.close()
self.timestamp_shm = None
self.timestamp_shm = None # type: ignore

if self.intrinsics_shm is not None:
self.intrinsics_shm.close()
self.intrinsics_shm = None
self.intrinsics_shm = None # type: ignore

if self.fps_shm is not None:
self.fps_shm.close()
self.fps_shm = None
self.fps_shm = None # type: ignore

if self.write_lock_shm is not None:
self.write_lock_shm.close()
self.write_lock_shm = None
self.write_lock_shm = None # type: ignore

if self.read_lock_shm is not None:
self.read_lock_shm.close()
self.read_lock_shm = None
self.read_lock_shm = None # type: ignore

def __del__(self) -> None:
self._close_shared_memory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ def __init__(self, shared_memory_namespace: str) -> None:
resource_tracker.unregister(self.depth_shape_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.depth_image_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.depth_image_shape_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.confidence_map_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.confidence_map_shape_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.point_cloud_positions_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.point_cloud_positions_shape_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.point_cloud_colors_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.point_cloud_colors_shape_shm._name, "shared_memory") # type: ignore[attr-defined)
resource_tracker.unregister(self.confidence_map_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.confidence_map_shape_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.point_cloud_positions_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.point_cloud_positions_shape_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.point_cloud_colors_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.point_cloud_colors_shape_shm._name, "shared_memory") # type: ignore[attr-defined]

self.depth_shape_shm_array: np.ndarray = np.ndarray((2,), dtype=np.int64, buffer=self.depth_shape_shm.buf)
self.depth_image_shape_shm_array: np.ndarray = np.ndarray(
Expand Down Expand Up @@ -292,11 +292,11 @@ def __init__(self, shared_memory_namespace: str) -> None:
point_cloud_colors_shape, dtype=np.uint8, buffer=self.point_cloud_colors_shm.buf
)

self.depth_buffer_array = np.ndarray(depth_shape, dtype=np.float32)
self.depth_image_buffer_array = np.ndarray(depth_image_shape, dtype=np.uint8)
self.confidence_map_buffer_array = np.ndarray(confidence_map_shape, dtype=np.float32)
self.point_cloud_positions_buffer_array = np.ndarray(point_cloud_positions_shape, dtype=np.float32)
self.point_cloud_colors_buffer_array = np.ndarray(point_cloud_colors_shape, dtype=np.uint8)
self.depth_buffer_array: np.ndarray = np.ndarray(depth_shape, dtype=np.float32)
self.depth_image_buffer_array: np.ndarray = np.ndarray(depth_image_shape, dtype=np.uint8)
self.confidence_map_buffer_array: np.ndarray = np.ndarray(confidence_map_shape, dtype=np.float32)
self.point_cloud_positions_buffer_array: np.ndarray = np.ndarray(point_cloud_positions_shape, dtype=np.float32)
self.point_cloud_colors_buffer_array: np.ndarray = np.ndarray(point_cloud_colors_shape, dtype=np.uint8)

def _retrieve_depth_map(self) -> NumpyDepthMapType:
while self.write_lock_shm_array[0]:
Expand Down Expand Up @@ -339,43 +339,43 @@ def _close_shared_memory(self) -> None: # noqa C901

if self.depth_shm is not None:
self.depth_shm.close()
self.depth_shm = None
self.depth_shm = None # type: ignore

if self.depth_shape_shm is not None:
self.depth_shape_shm.close()
self.depth_shape_shm = None
self.depth_shape_shm = None # type: ignore

if self.depth_image_shm is not None:
self.depth_image_shm.close()
self.depth_image_shm = None
self.depth_image_shm = None # type: ignore

if self.depth_image_shape_shm is not None:
self.depth_image_shape_shm.close()
self.depth_image_shape_shm = None
self.depth_image_shape_shm = None # type: ignore

if self.confidence_map_shm is not None:
self.confidence_map_shm.close()
self.confidence_map_shm = None
self.confidence_map_shm = None # type: ignore

if self.confidence_map_shape_shm is not None:
self.confidence_map_shape_shm.close()
self.confidence_map_shape_shm = None
self.confidence_map_shape_shm = None # type: ignore

if self.point_cloud_positions_shm is not None:
self.point_cloud_positions_shm.close()
self.point_cloud_positions_shm = None
self.point_cloud_positions_shm = None # type: ignore

if self.point_cloud_positions_shape_shm is not None:
self.point_cloud_positions_shape_shm.close()
self.point_cloud_positions_shape_shm = None
self.point_cloud_positions_shape_shm = None # type: ignore

if self.point_cloud_colors_shm is not None:
self.point_cloud_colors_shm.close()
self.point_cloud_colors_shm = None
self.point_cloud_colors_shm = None # type: ignore

if self.point_cloud_colors_shape_shm is not None:
self.point_cloud_colors_shape_shm.close()
self.point_cloud_colors_shape_shm = None
self.point_cloud_colors_shape_shm = None # type: ignore

def __del__(self) -> None:
super().__del__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ def __init__(self, shared_memory_namespace: str) -> None:

logger.info("Found stereo shared memory blocks.")

resource_tracker.unregister(self.rgb_right_shm._name, "shared_memory")
resource_tracker.unregister(self.rgb_right_shape_shm._name, "shared_memory")
resource_tracker.unregister(self.pose_right_in_left_shm._name, "shared_memory")
resource_tracker.unregister(self.intrinsics_right_shm._name, "shared_memory")
resource_tracker.unregister(self.rgb_right_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.rgb_right_shape_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.pose_right_in_left_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.intrinsics_right_shm._name, "shared_memory") # type: ignore[attr-defined]

self.rgb_right_shape_shm_array: np.ndarray = np.ndarray(
(3,), dtype=np.int64, buffer=self.rgb_right_shape_shm.buf
Expand All @@ -212,7 +212,7 @@ def __init__(self, shared_memory_namespace: str) -> None:
rgb_right_shape, dtype=np.uint8, buffer=self.rgb_right_shm.buf
)

self.rgb_right_buffer_array = np.ndarray(rgb_right_shape, dtype=np.uint8)
self.rgb_right_buffer_array: np.ndarray = np.ndarray(rgb_right_shape, dtype=np.uint8)

def _retrieve_rgb_image(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyFloatImageType:
image = self._retrieve_rgb_image_as_int(view)
Expand Down Expand Up @@ -255,19 +255,19 @@ def _close_shared_memory(self) -> None:

if self.rgb_right_shm is not None:
self.rgb_right_shm.close()
self.rgb_right_shm = None
self.rgb_right_shm = None # type: ignore

if self.rgb_right_shape_shm is not None:
self.rgb_right_shape_shm.close()
self.rgb_right_shape_shm = None
self.rgb_right_shape_shm = None # type: ignore

if self.pose_right_in_left_shm is not None:
self.pose_right_in_left_shm.close()
self.pose_right_in_left_shm = None
self.pose_right_in_left_shm = None # type: ignore

if self.intrinsics_right_shm is not None:
self.intrinsics_right_shm.close()
self.intrinsics_right_shm = None
self.intrinsics_right_shm = None # type: ignore

def __del__(self) -> None:
super().__del__()
Expand Down
5 changes: 3 additions & 2 deletions airo-camera-toolkit/airo_camera_toolkit/cameras/zed/zed2i.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ def _retrieve_rgb_image(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyFlo

def _retrieve_rgb_image_as_int(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyIntImageType:
assert view in StereoRGBDCamera._VIEWS
image_bgra: OpenCVIntImageType
if view == StereoRGBDCamera.RIGHT_RGB:
self.camera.retrieve_image(self.image_matrix_right, sl.VIEW.RIGHT)
image_bgra: OpenCVIntImageType = self.image_matrix_right.get_data()
image_bgra = self.image_matrix_right.get_data()
else:
self.camera.retrieve_image(self.image_matrix, sl.VIEW.LEFT)
image_bgra: OpenCVIntImageType = self.image_matrix.get_data()
image_bgra = self.image_matrix.get_data()

image = cv2.cvtColor(image_bgra, cv2.COLOR_BGRA2RGB)
return image
Expand Down
4 changes: 4 additions & 0 deletions airo-camera-toolkit/airo_camera_toolkit/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def get_rgb_image_as_int(self, view: str = LEFT_RGB) -> NumpyIntImageType:
@abc.abstractmethod
def _retrieve_rgb_image(self, view: str = LEFT_RGB) -> NumpyFloatImageType:
raise NotImplementedError

@abc.abstractmethod
def _retrieve_rgb_image_as_int(self, view: str = LEFT_RGB) -> NumpyIntImageType:
raise NotImplementedError

# TODO: check view argument value?
@abc.abstractmethod
Expand Down

0 comments on commit 4f41a90

Please sign in to comment.