diff --git a/dimos/manipulation/dynamic_tracking/__init__.py b/dimos/manipulation/dynamic_tracking/__init__.py new file mode 100644 index 0000000000..5e3fce21a9 --- /dev/null +++ b/dimos/manipulation/dynamic_tracking/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Visual servoing with eye-in-hand ArUco marker tracking.""" diff --git a/dimos/manipulation/dynamic_tracking/aruco_tracker.py b/dimos/manipulation/dynamic_tracking/aruco_tracker.py new file mode 100644 index 0000000000..2457661b43 --- /dev/null +++ b/dimos/manipulation/dynamic_tracking/aruco_tracker.py @@ -0,0 +1,392 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ArUco marker tracker for visual servoing. + +Detects ArUco markers in camera images, estimates their pose relative to the +camera, publishes transforms to the TF tree, and outputs a target EE pose +(as PoseStamped) for the ControlCoordinator's CartesianIKTask. + +The visual servo loop is: + camera image → ArUco detection → marker pose in camera frame + → TF lookup (base_link → marker) → compute reach pose + → publish PoseStamped on cartesian_command → CartesianIKTask handles IK + +TF tree (eye-in-hand): + base_link → ... → ee_link → camera_link → camera_color_optical_frame → aruco_avg +""" + +from dataclasses import dataclass, field +import math +from threading import Event, Thread +import time +from typing import Any + +import cv2 +import numpy as np +import rerun as rr +from scipy.spatial.transform import Rotation # type: ignore[import-untyped] + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 +from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +def _estimate_marker_poses( + corners: list[np.ndarray], + marker_size: float, + camera_matrix: np.ndarray, + dist_coeffs: np.ndarray, +) -> tuple[np.ndarray, np.ndarray]: + """Estimate per-marker pose via solvePnP (replaces deprecated estimatePoseSingleMarkers). + + Returns rvecs, tvecs each shaped (N, 1, 3) to match the legacy API. + """ + half = marker_size / 2.0 + # Marker corner order from cv2.aruco matches this object-point layout + obj_points = np.array( + [[-half, half, 0.0], [half, half, 0.0], [half, -half, 0.0], [-half, -half, 0.0]], + dtype=np.float32, + ) + rvecs_out: list[np.ndarray] = [] + tvecs_out: list[np.ndarray] = [] + for c in corners: + ok, rvec, tvec = cv2.solvePnP( + obj_points, c.reshape(-1, 2), camera_matrix, dist_coeffs, flags=cv2.SOLVEPNP_IPPE_SQUARE + ) + if ok: + rvecs_out.append(rvec.flatten()) + tvecs_out.append(tvec.flatten()) + + if not rvecs_out: + return np.zeros((0, 1, 3), dtype=np.float64), np.zeros((0, 1, 3), dtype=np.float64) + + return ( + np.array(rvecs_out, dtype=np.float64)[:, np.newaxis, :], + np.array(tvecs_out, dtype=np.float64)[:, np.newaxis, :], + ) + + +@dataclass +class ArucoTrackerConfig(ModuleConfig): + """Configuration for the ArUco tracker module.""" + + # ArUco detection + marker_size: float = 0.1 # meters + aruco_dict: int = cv2.aruco.DICT_4X4_50 + expected_marker_count: int = 4 + + # Processing + camera_frame_id: str = "camera_color_optical_frame" + rate: float = 15.0 # Hz + max_loops: int = 0 # 0 = run forever + + # Visual servo target + target_task_name: str = "cartesian_ik_arm" # CartesianIKTask name in coordinator + enable_servo: bool = True + reach_offset: list[float] = field( + default_factory=lambda: [0.0, 0.0, 0.10] + ) # xyz offset in base frame + min_move_distance_m: float = 0.003 # skip moves smaller than this + + # Default orientation for reach pose (RPY in radians, pointing down) + reach_orientation_rpy: list[float] = field(default_factory=lambda: [math.pi, 0.0, 0.0]) + use_marker_orientation: bool = False + + +class ArucoTracker(Module[ArucoTrackerConfig]): + """ArUco marker tracker with visual servoing output. + + Detects ArUco markers, publishes their transforms to the TF tree, + and outputs a target EE PoseStamped for CartesianIKTask-based servoing. + """ + + # Inputs + color_image: In[Image] + camera_info: In[CameraInfo] + + # Outputs + annotated_image: Out[Image] + cartesian_command: Out[PoseStamped] # target EE pose for visual servo + + config: ArucoTrackerConfig + default_config = ArucoTrackerConfig + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + # ArUco detector + self._aruco_dict = cv2.aruco.getPredefinedDictionary(self.config.aruco_dict) + self._aruco_params = cv2.aruco.DetectorParameters() + self._detector = cv2.aruco.ArucoDetector(self._aruco_dict, self._aruco_params) + + # Camera intrinsics (populated from CameraInfo) + self._camera_matrix: np.ndarray | None = None + self._dist_coeffs: np.ndarray | None = None + self._latest_image: Image | None = None + + # Processing thread + self._stop_event = Event() + self._processing_thread: Thread | None = None + self._loop_count = 0 + + # Last published position for min-move filter + self._last_target_position: np.ndarray | None = None + + # ========================================================================= + # Lifecycle + # ========================================================================= + + @rpc + def start(self) -> None: + super().start() + self._disposables.add(self.camera_info.observable().subscribe(self._update_camera_info)) + self._disposables.add(self.color_image.observable().subscribe(self._store_latest_image)) + + self._loop_count = 0 + self._stop_event.clear() + self._processing_thread = Thread( + target=self._processing_loop, daemon=True, name="ArucoTracker" + ) + self._processing_thread.start() + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._processing_thread is not None and self._processing_thread.is_alive(): + self._processing_thread.join(timeout=2.0) + super().stop() + + # ========================================================================= + # Callbacks + # ========================================================================= + + def _store_latest_image(self, image: Image) -> None: + self._latest_image = image + + def _update_camera_info(self, camera_info: CameraInfo) -> None: + if len(camera_info.K) == 9: + fx, _, cx, _, fy, cy, _, _, _ = camera_info.K + self._camera_matrix = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32) + self._dist_coeffs = ( + np.array(camera_info.D, dtype=np.float32) + if camera_info.D + else np.zeros(5, dtype=np.float32) + ) + + # ========================================================================= + # Processing loop + # ========================================================================= + + def _processing_loop(self) -> None: + period = 1.0 / self.config.rate + logger.info(f"ArUco processing loop started at {self.config.rate}Hz") + + while not self._stop_event.is_set(): + if self.config.max_loops > 0 and self._loop_count >= self.config.max_loops: + break + + loop_start = time.time() + try: + if self._latest_image is not None: + self._update_tracking(self._latest_image) + self._loop_count += 1 + except Exception as e: + logger.error(f"Error in ArUco processing: {e}") + + elapsed = time.time() - loop_start + sleep_time = period - elapsed + if sleep_time > 0: + self._stop_event.wait(sleep_time) + + logger.info(f"ArUco processing loop completed after {self._loop_count} iterations") + + def _update_tracking(self, image: Image) -> None: + if self._camera_matrix is None or self._dist_coeffs is None: + return + + # Convert to grayscale for detection + if image.format.name == "RGB": + display_image = image.data.copy() + gray = cv2.cvtColor(image.data, cv2.COLOR_RGB2GRAY) + elif image.format.name == "BGR": + display_image = image.data.copy() + gray = cv2.cvtColor(image.data, cv2.COLOR_BGR2GRAY) + else: + display_image = image.data.copy() + gray = image.data + + corners, ids, _ = self._detector.detectMarkers(gray) + + num_detected = 0 if ids is None else len(ids) + if num_detected != self.config.expected_marker_count: + logger.debug(f"Detected {num_detected}/{self.config.expected_marker_count} markers") + self._publish_annotated_image(display_image, image.format.name) + return + + # Estimate poses (failed solvePnP results are filtered out) + rvecs, tvecs = _estimate_marker_poses( + list(corners), self.config.marker_size, self._camera_matrix, self._dist_coeffs + ) + if len(rvecs) != self.config.expected_marker_count: + logger.debug( + f"Only {len(rvecs)}/{self.config.expected_marker_count} markers " + f"produced a valid solvePnP pose" + ) + self._publish_annotated_image(display_image, image.format.name) + return + avg_position, avg_quat = self._average_marker_poses(rvecs, tvecs) + + # Publish transform: camera_optical → aruco_avg + ts = image.ts + aruco_tf = Transform( + translation=Vector3( + float(avg_position[0]), float(avg_position[1]), float(avg_position[2]) + ), + rotation=Quaternion( + float(avg_quat[0]), float(avg_quat[1]), float(avg_quat[2]), float(avg_quat[3]) + ), + frame_id=self.config.camera_frame_id, + child_frame_id="aruco_avg", + ts=ts, + ) + self.tf.publish(aruco_tf) + rr.log("world/tf/aruco_avg", aruco_tf.to_rerun()) + + # Visual servo: compute and publish target EE pose + if self.config.enable_servo: + self._servo_to_marker(ts) + + # Draw markers + self._draw_markers(display_image, list(corners), ids, rvecs, tvecs, image.format.name) + + # ========================================================================= + # Visual servo + # ========================================================================= + + def _servo_to_marker(self, ts: float) -> None: + """Compute reach pose from marker and publish as cartesian command.""" + # Look up marker position in base_link frame via TF tree + aruco_wrt_base = self.tf.get("base_link", "aruco_avg") + if aruco_wrt_base is None: + logger.debug("TF lookup base_link→aruco_avg failed, skipping servo") + return + + t = aruco_wrt_base.translation + offset = self.config.reach_offset + target_pos = Vector3(t.x + offset[0], t.y + offset[1], t.z + offset[2]) + + # Min-move filter + pos_array = np.array([target_pos.x, target_pos.y, target_pos.z]) + if self._last_target_position is not None: + distance = float(np.linalg.norm(pos_array - self._last_target_position)) + if distance < self.config.min_move_distance_m: + return + + # Orientation: follow marker or use fixed default + if self.config.use_marker_orientation: + rpy = aruco_wrt_base.rotation.to_euler() + roll = ((rpy.x + math.pi + math.pi) % (2 * math.pi)) - math.pi + pitch = np.clip(rpy.y, -math.pi / 2, math.pi / 2) + yaw = np.clip(rpy.z + math.pi / 2, -math.pi / 2, math.pi / 2) + orientation = Quaternion.from_euler(Vector3(roll, pitch, yaw)) + else: + rpy = self.config.reach_orientation_rpy + orientation = Quaternion.from_euler(Vector3(rpy[0], rpy[1], rpy[2])) + + # Publish PoseStamped with frame_id = task name (for coordinator routing) + command = PoseStamped( + ts=ts, + frame_id=self.config.target_task_name, + position=target_pos, + orientation=orientation, + ) + self.cartesian_command.publish(command) + self._last_target_position = pos_array + + logger.debug(f"Servo target: [{target_pos.x:.3f}, {target_pos.y:.3f}, {target_pos.z:.3f}]") + + # ========================================================================= + # ArUco pose computation + # ========================================================================= + + def _average_marker_poses( + self, rvecs: np.ndarray, tvecs: np.ndarray + ) -> tuple[np.ndarray, np.ndarray]: + """Average multiple marker poses into a single pose.""" + rotations = [] + positions = [] + for i in range(len(rvecs)): + rot_matrix, _ = cv2.Rodrigues(rvecs[i][0]) + rotations.append(Rotation.from_matrix(rot_matrix)) + positions.append(tvecs[i][0]) + + avg_position = np.mean(positions, axis=0) + avg_rotation = Rotation.concatenate(rotations).mean() + avg_quat = avg_rotation.as_quat() # [x, y, z, w] + return avg_position, avg_quat + + # ========================================================================= + # Image annotation + # ========================================================================= + + def _publish_annotated_image(self, display_image: np.ndarray, image_format: str) -> None: + if image_format == "BGR": + publish_image = cv2.cvtColor(display_image, cv2.COLOR_BGR2RGB) + else: + publish_image = display_image + + annotated_msg = Image( + data=publish_image, + format=ImageFormat.RGB, + frame_id=self.config.camera_frame_id, + ts=time.time(), + ) + self.annotated_image.publish(annotated_msg) + rr.log("aruco/annotated", rr.Image(publish_image)) + + def _draw_markers( + self, + display_image: np.ndarray, + corners: list[np.ndarray], + ids: np.ndarray, + rvecs: np.ndarray, + tvecs: np.ndarray, + image_format: str, + ) -> None: + cv2.aruco.drawDetectedMarkers(display_image, corners, ids) + if self._camera_matrix is None or self._dist_coeffs is None: + self._publish_annotated_image(display_image, image_format) + return + for i in range(len(ids)): + cv2.drawFrameAxes( + display_image, + self._camera_matrix, + self._dist_coeffs, + rvecs[i][0], + tvecs[i][0], + self.config.marker_size * 0.5, + ) + self._publish_annotated_image(display_image, image_format) + + +aruco_tracker = ArucoTracker.blueprint + +__all__ = ["ArucoTracker", "ArucoTrackerConfig", "aruco_tracker"] diff --git a/dimos/manipulation/dynamic_tracking/blueprints.py b/dimos/manipulation/dynamic_tracking/blueprints.py new file mode 100644 index 0000000000..8c4d8dc01f --- /dev/null +++ b/dimos/manipulation/dynamic_tracking/blueprints.py @@ -0,0 +1,259 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Visual servoing blueprints: ArUco tracking + CartesianIK control. + +Wires RealSense camera, ArUco tracker, and ControlCoordinator with a +CartesianIKTask for eye-in-hand visual servoing. + +The servo loop: + RealSense → ArucoTracker → PoseStamped → Coordinator.cartesian_command + → CartesianIKTask (IK) + → JointCommandOutput → hardware + +TF tree (eye-in-hand): + base_link → (coordinator joint_state FK) → ee_link → camera_link + → camera_color_optical_frame → aruco_avg + +Usage: + # Camera-only tracking (no arm movement): + dimos run aruco-tracker + + # Full visual servoing with mock arm (for testing): + dimos run aruco-servo-mock + + # Full visual servoing with XArm6: + dimos run aruco-servo-xarm6 +""" + +from __future__ import annotations + +import math +import os + +import cv2 + +from dimos.control.components import HardwareComponent, HardwareType, make_joints +from dimos.control.coordinator import TaskConfig, control_coordinator +from dimos.core.blueprints import autoconnect +from dimos.core.transport import LCMTransport +from dimos.hardware.sensors.camera.realsense import realsense_camera +from dimos.manipulation.dynamic_tracking.aruco_tracker import aruco_tracker +from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 +from dimos.msgs.sensor_msgs import CameraInfo, JointState +from dimos.msgs.sensor_msgs.Image import Image +from dimos.utils.data import LfsPath + +# ============================================================================= +# Model paths for IK +# ============================================================================= + +_XARM6_MODEL_PATH = LfsPath("xarm_description/urdf/xarm6/xarm6.urdf") + +# XArm IP — override via DIMOS_XARM6_IP env var +_XARM6_IP = os.environ.get("DIMOS_XARM6_IP", "192.168.1.210") + +# ============================================================================= +# Camera calibration (EEF → camera transform) +# ============================================================================= +# Default eye-in-hand calibration for xArm6 + RealSense D435. +# To re-calibrate, run: +# python -m dimos.manipulation.dynamic_tracking.calibrate_hand_eye \ +# --arm-ip 192.168.1.210 --output calibration.json +# Then load with calibration_to_transform("calibration.json"). + +_DEFAULT_EEF_TO_CAMERA = Transform( + translation=Vector3(0.06693724, -0.0309563, 0.00691482), + rotation=Quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw +) + +# ============================================================================= +# Camera-only: ArUco tracker with RealSense (no arm control) +# ============================================================================= +# Detects markers and publishes TF transforms. No servo output. +# Useful for verifying detection, calibration, and TF tree. + +aruco_tracker_realsense = ( + autoconnect( + realsense_camera( + width=848, + height=480, + fps=15, + camera_name="camera", + base_frame_id="ee_link", + base_transform=_DEFAULT_EEF_TO_CAMERA, + enable_depth=False, + ), + aruco_tracker( + marker_size=0.015, + aruco_dict=cv2.aruco.DICT_4X4_50, + expected_marker_count=4, + camera_frame_id="camera_color_optical_frame", + rate=15, + enable_servo=False, + ), + ) + .transports( + { + ("color_image", Image): LCMTransport("/camera/color", Image), + ("camera_info", CameraInfo): LCMTransport("/camera/color_info", CameraInfo), + ("annotated_image", Image): LCMTransport("/aruco/annotated", Image), + } + ) + .global_config(viewer_backend="rerun-native") +) + +# ============================================================================= +# Visual servoing with mock arm (for testing without hardware) +# ============================================================================= +# Full servo loop: ArUco detection → CartesianIK → mock arm. +# Camera is real (RealSense); arm is simulated. + +aruco_servo_mock = ( + autoconnect( + control_coordinator( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + adapter_type="mock", + ), + ], + tasks=[ + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=[f"arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_XARM6_MODEL_PATH, + ee_joint_id=6, + ), + ], + ), + realsense_camera( + width=848, + height=480, + fps=15, + camera_name="camera", + base_frame_id="ee_link", + base_transform=_DEFAULT_EEF_TO_CAMERA, + enable_depth=False, + ), + aruco_tracker( + marker_size=0.015, + aruco_dict=cv2.aruco.DICT_4X4_50, + expected_marker_count=4, + camera_frame_id="camera_color_optical_frame", + rate=10, + enable_servo=True, + target_task_name="cartesian_ik_arm", + reach_offset=[0.0, 0.0, 0.10], + reach_orientation_rpy=[math.pi, 0.0, 0.0], + min_move_distance_m=0.003, + ), + ) + .transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("color_image", Image): LCMTransport("/camera/color", Image), + ("camera_info", CameraInfo): LCMTransport("/camera/color_info", CameraInfo), + ("annotated_image", Image): LCMTransport("/aruco/annotated", Image), + } + ) + .global_config(viewer_backend="rerun-native") +) + + +# ============================================================================= +# Visual servoing with XArm6 (real hardware) +# ============================================================================= +# Full servo loop: ArUco detection → CartesianIK → XArm6. +# Both camera and arm are real hardware. + +aruco_servo_xarm6 = ( + autoconnect( + control_coordinator( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + adapter_type="xarm", + address=_XARM6_IP, + auto_enable=True, + ), + ], + tasks=[ + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=[f"arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_XARM6_MODEL_PATH, + ee_joint_id=6, + ), + ], + ), + realsense_camera( + width=848, + height=480, + fps=15, + camera_name="camera", + base_frame_id="ee_link", + base_transform=_DEFAULT_EEF_TO_CAMERA, + enable_depth=False, + ), + aruco_tracker( + marker_size=0.015, + aruco_dict=cv2.aruco.DICT_4X4_50, + expected_marker_count=4, + camera_frame_id="camera_color_optical_frame", + rate=4, + enable_servo=True, + target_task_name="cartesian_ik_arm", + reach_offset=[0.0, 0.0, 0.10], + reach_orientation_rpy=[math.pi, 0.0, 0.0], + min_move_distance_m=0.005, + ), + ) + .transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("color_image", Image): LCMTransport("/camera/color", Image), + ("camera_info", CameraInfo): LCMTransport("/camera/color_info", CameraInfo), + ("annotated_image", Image): LCMTransport("/aruco/annotated", Image), + } + ) + .global_config(viewer_backend="rerun-native") +) + + +__all__ = [ + "aruco_servo_mock", + "aruco_servo_xarm6", + "aruco_tracker_realsense", +] diff --git a/dimos/manipulation/dynamic_tracking/calibrate_hand_eye.py b/dimos/manipulation/dynamic_tracking/calibrate_hand_eye.py new file mode 100644 index 0000000000..f992805bfe --- /dev/null +++ b/dimos/manipulation/dynamic_tracking/calibrate_hand_eye.py @@ -0,0 +1,408 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Eye-in-hand calibration tool for computing the EEF→camera transform. + +Collects paired (EE pose, marker-in-camera pose) samples while the user +moves the arm to different configurations, then solves AX=XB using +OpenCV's calibrateHandEye(). + +Usage: + python -m dimos.manipulation.dynamic_tracking.calibrate_hand_eye \\ + --arm-ip 192.168.1.210 \\ + --marker-size 0.015 \\ + --output calibration.json + + # The tool will: + # 1. Connect to the arm and camera + # 2. Prompt you to move the arm to different poses (marker must be visible) + # 3. Press Enter to capture each sample (minimum 3, recommend 10+) + # 4. Press 'q' to finish collecting and solve + # 5. Save the EEF→camera transform to JSON +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +import sys +import time +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Callable + +import cv2 +import numpy as np +from scipy.spatial.transform import Rotation # type: ignore[import-untyped] + + +def _estimate_marker_pose( + corners: np.ndarray, + marker_size: float, + camera_matrix: np.ndarray, + dist_coeffs: np.ndarray, +) -> tuple[np.ndarray, np.ndarray] | None: + """Estimate single marker pose via solvePnP (replaces deprecated estimatePoseSingleMarkers).""" + half = marker_size / 2.0 + obj_points = np.array( + [[-half, half, 0.0], [half, half, 0.0], [half, -half, 0.0], [-half, -half, 0.0]], + dtype=np.float32, + ) + ok, rvec, tvec = cv2.solvePnP( + obj_points, + corners.reshape(-1, 2), + camera_matrix, + dist_coeffs, + flags=cv2.SOLVEPNP_IPPE_SQUARE, + ) + if not ok: + return None + return rvec.flatten(), tvec.flatten() + + +def collect_sample_interactive( + arm_positions_getter: Callable[[], np.ndarray | None], + camera_frame_getter: Callable[[], tuple[np.ndarray | None, np.ndarray, np.ndarray]], + marker_size: float, + aruco_dict_id: int = cv2.aruco.DICT_4X4_50, +) -> tuple[list[np.ndarray], list[np.ndarray], list[np.ndarray], list[np.ndarray]]: + """Interactively collect calibration samples. + + Returns: + R_gripper2base, t_gripper2base, R_target2cam, t_target2cam + """ + aruco_dict = cv2.aruco.getPredefinedDictionary(aruco_dict_id) + detector = cv2.aruco.ArucoDetector(aruco_dict, cv2.aruco.DetectorParameters()) + + R_gripper2base_list: list[np.ndarray] = [] + t_gripper2base_list: list[np.ndarray] = [] + R_target2cam_list: list[np.ndarray] = [] + t_target2cam_list: list[np.ndarray] = [] + + print("\n=== Eye-in-Hand Calibration ===") + print("Move the arm so the marker is visible from different angles.") + print("Press ENTER to capture a sample, 'q' to finish.\n") + + sample_idx = 0 + while True: + user_input = input(f"Sample {sample_idx + 1} — press Enter to capture, 'q' to finish: ") + if user_input.strip().lower() == "q": + break + + # Get EE pose from arm (4x4 homogeneous matrix) + ee_pose = arm_positions_getter() + if ee_pose is None: + print(" Failed to read arm pose, try again.") + continue + + # Get camera frame and detect marker + frame, camera_matrix, dist_coeffs = camera_frame_getter() + if frame is None: + print(" Failed to read camera frame, try again.") + continue + + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame + corners, ids, _ = detector.detectMarkers(gray) + if ids is None or len(ids) == 0: + print(" No markers detected, try again.") + continue + + # Use first detected marker + result = _estimate_marker_pose(corners[0], marker_size, camera_matrix, dist_coeffs) + if result is None: + print(" Failed to solve marker pose, try again.") + continue + rvec, tvec = result + + R_target, _ = cv2.Rodrigues(rvec) + + # EE pose decomposition + R_ee = ee_pose[:3, :3] + t_ee = ee_pose[:3, 3] + + R_gripper2base_list.append(R_ee) + t_gripper2base_list.append(t_ee.reshape(3, 1)) + R_target2cam_list.append(R_target) + t_target2cam_list.append(tvec.reshape(3, 1)) + + sample_idx += 1 + print( + f" Captured sample {sample_idx}: marker at [{tvec[0]:.3f}, {tvec[1]:.3f}, {tvec[2]:.3f}]" + ) + + return R_gripper2base_list, t_gripper2base_list, R_target2cam_list, t_target2cam_list + + +def solve_hand_eye( + R_gripper2base: list[np.ndarray], + t_gripper2base: list[np.ndarray], + R_target2cam: list[np.ndarray], + t_target2cam: list[np.ndarray], + method: int = cv2.CALIB_HAND_EYE_TSAI, +) -> tuple[np.ndarray, np.ndarray]: + """Solve AX=XB for the camera-to-gripper transform. + + Args: + R_gripper2base: List of 3x3 rotation matrices (EE in base frame) + t_gripper2base: List of 3x1 translation vectors (EE in base frame) + R_target2cam: List of 3x3 rotation matrices (marker in camera frame) + t_target2cam: List of 3x1 translation vectors (marker in camera frame) + method: OpenCV hand-eye method (default: Tsai) + + Returns: + (R_cam2gripper, t_cam2gripper) — the 3x3 rotation and 3x1 translation + """ + R_cam2gripper, t_cam2gripper = cv2.calibrateHandEye( + R_gripper2base, + t_gripper2base, + R_target2cam, + t_target2cam, + method=method, + ) + return R_cam2gripper, t_cam2gripper + + +def save_calibration( + R: np.ndarray, + t: np.ndarray, + output_path: str | Path, + method: str = "tsai", + num_samples: int = 0, +) -> None: + """Save calibration result as JSON. + + The JSON contains translation (xyz) and rotation (quaternion xyzw), + suitable for use as base_transform in RealSenseCamera config. + """ + quat = Rotation.from_matrix(R).as_quat() # [x, y, z, w] + result = { + "transform": { + "translation": { + "x": float(t[0, 0]), + "y": float(t[1, 0]), + "z": float(t[2, 0]), + }, + "rotation": { + "x": float(quat[0]), + "y": float(quat[1]), + "z": float(quat[2]), + "w": float(quat[3]), + }, + }, + "metadata": { + "method": method, + "num_samples": num_samples, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "description": "EEF (gripper) to camera transform for eye-in-hand setup", + }, + } + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(result, indent=2) + "\n") + print(f"\nCalibration saved to {path}") + print(f" Translation: [{t[0, 0]:.6f}, {t[1, 0]:.6f}, {t[2, 0]:.6f}]") + print(f" Quaternion (xyzw): [{quat[0]:.6f}, {quat[1]:.6f}, {quat[2]:.6f}, {quat[3]:.6f}]") + + +def load_calibration(path: str | Path) -> tuple[list[float], list[float]]: + """Load calibration JSON and return (translation_xyz, quaternion_xyzw).""" + data = json.loads(Path(path).read_text()) + t = data["transform"]["translation"] + r = data["transform"]["rotation"] + return [t["x"], t["y"], t["z"]], [r["x"], r["y"], r["z"], r["w"]] + + +def calibration_to_transform(path: str | Path) -> tuple[Any, Any]: + """Load calibration and return (Vector3, Quaternion) for blueprint use. + + Example: + from dimos.manipulation.dynamic_tracking.calibrate_hand_eye import calibration_to_transform + translation, rotation = calibration_to_transform("calibration.json") + # Use in RealSenseCamera blueprint: + # base_transform=Transform(translation=translation, rotation=rotation) + """ + from dimos.msgs.geometry_msgs import Quaternion, Vector3 + + xyz, xyzw = load_calibration(path) + return Vector3(*xyz), Quaternion(*xyzw) + + +def _reprojection_error( + R_cam2gripper: np.ndarray, + t_cam2gripper: np.ndarray, + R_gripper2base: list[np.ndarray], + t_gripper2base: list[np.ndarray], + R_target2cam: list[np.ndarray], + t_target2cam: list[np.ndarray], +) -> float: + """Compute mean reprojection error across all sample pairs.""" + errors = [] + n = len(R_gripper2base) + for i in range(n): + for j in range(i + 1, n): + # Expected: base_T_target should be consistent across all samples + T_base_cam_i = np.eye(4) + T_base_cam_i[:3, :3] = R_gripper2base[i] @ R_cam2gripper + T_base_cam_i[:3, 3] = (R_gripper2base[i] @ t_cam2gripper + t_gripper2base[i]).flatten() + + T_base_target_i = np.eye(4) + T_base_target_i[:3, :3] = T_base_cam_i[:3, :3] @ R_target2cam[i] + T_base_target_i[:3, 3] = ( + T_base_cam_i[:3, :3] @ t_target2cam[i] + T_base_cam_i[:3, 3:] + ).flatten() + + T_base_cam_j = np.eye(4) + T_base_cam_j[:3, :3] = R_gripper2base[j] @ R_cam2gripper + T_base_cam_j[:3, 3] = (R_gripper2base[j] @ t_cam2gripper + t_gripper2base[j]).flatten() + + T_base_target_j = np.eye(4) + T_base_target_j[:3, :3] = T_base_cam_j[:3, :3] @ R_target2cam[j] + T_base_target_j[:3, 3] = ( + T_base_cam_j[:3, :3] @ t_target2cam[j] + T_base_cam_j[:3, 3:] + ).flatten() + + pos_err = np.linalg.norm(T_base_target_i[:3, 3] - T_base_target_j[:3, 3]) + errors.append(pos_err) + + return float(np.mean(errors)) if errors else 0.0 + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Eye-in-hand calibration: compute EEF→camera transform" + ) + parser.add_argument("--arm-ip", type=str, default="192.168.1.210", help="XArm IP address") + parser.add_argument("--arm-dof", type=int, default=6, help="Arm degrees of freedom") + parser.add_argument("--marker-size", type=float, default=0.015, help="ArUco marker size (m)") + parser.add_argument( + "--aruco-dict", type=int, default=cv2.aruco.DICT_4X4_50, help="ArUco dict ID" + ) + parser.add_argument("--output", type=str, default="calibration.json", help="Output JSON path") + parser.add_argument( + "--method", + type=str, + default="tsai", + choices=["tsai", "park", "horaud", "andreff", "daniilidis"], + help="Hand-eye calibration method", + ) + args = parser.parse_args() + + method_map = { + "tsai": cv2.CALIB_HAND_EYE_TSAI, + "park": cv2.CALIB_HAND_EYE_PARK, + "horaud": cv2.CALIB_HAND_EYE_HORAUD, + "andreff": cv2.CALIB_HAND_EYE_ANDREFF, + "daniilidis": cv2.CALIB_HAND_EYE_DANIILIDIS, + } + + # --- Connect to arm --- + try: + from dimos.hardware.manipulators.xarm import XArmAdapter + + adapter = XArmAdapter(address=args.arm_ip, dof=args.arm_dof) + if not adapter.connect(): + print("Failed to connect to arm") + return 1 + print(f"Connected to XArm at {args.arm_ip}") + except ImportError: + print("XArm adapter not available. Install xArm SDK.") + return 1 + + # --- Connect to camera --- + try: + import pyrealsense2 as rs # type: ignore[import-not-found,import-untyped] + + pipeline = rs.pipeline() + config = rs.config() + config.enable_stream(rs.stream.color, 848, 480, rs.format.bgr8, 15) + profile = pipeline.start(config) + color_stream = profile.get_stream(rs.stream.color).as_video_stream_profile() + intrinsics = color_stream.get_intrinsics() + camera_matrix = np.array( + [[intrinsics.fx, 0, intrinsics.ppx], [0, intrinsics.fy, intrinsics.ppy], [0, 0, 1]], + dtype=np.float32, + ) + dist_coeffs = ( + np.array(intrinsics.coeffs, dtype=np.float32) + if intrinsics.coeffs + else np.zeros(5, dtype=np.float32) + ) + print("RealSense camera connected") + except ImportError: + print("pyrealsense2 not available. Install Intel RealSense SDK.") + return 1 + + def get_ee_pose() -> np.ndarray | None: + """Read current EE pose as 4x4 homogeneous matrix (meters, radians). + + XArmAdapter.read_cartesian_position() converts the SDK's mm→m and deg→rad, + so both the EE translation and the marker translation (from solvePnP with + marker_size in meters) are in the same unit system. + """ + try: + pose = adapter.read_cartesian_position() + if pose is None: + return None + R = Rotation.from_euler("xyz", [pose["roll"], pose["pitch"], pose["yaw"]]).as_matrix() + T = np.eye(4) + T[:3, :3] = R + T[:3, 3] = [pose["x"], pose["y"], pose["z"]] + return T + except Exception as e: + print(f"Failed to read EE pose: {e}") + return None + + def get_camera_frame() -> tuple[np.ndarray | None, np.ndarray, np.ndarray]: + """Capture a camera frame.""" + try: + frames = pipeline.wait_for_frames(timeout_ms=2000) + color_frame = frames.get_color_frame() + if not color_frame: + return None, camera_matrix, dist_coeffs + return np.asanyarray(color_frame.get_data()), camera_matrix, dist_coeffs + except Exception: + return None, camera_matrix, dist_coeffs + + # --- Collect samples --- + R_g2b, t_g2b, R_t2c, t_t2c = collect_sample_interactive( + get_ee_pose, get_camera_frame, args.marker_size, args.aruco_dict + ) + + if len(R_g2b) < 3: + print(f"\nNeed at least 3 samples, got {len(R_g2b)}. Aborting.") + pipeline.stop() + adapter.disconnect() + return 1 + + # --- Solve --- + print(f"\nSolving hand-eye calibration with {len(R_g2b)} samples ({args.method})...") + R_cam2gripper, t_cam2gripper = solve_hand_eye( + R_g2b, t_g2b, R_t2c, t_t2c, method=method_map[args.method] + ) + + error = _reprojection_error(R_cam2gripper, t_cam2gripper, R_g2b, t_g2b, R_t2c, t_t2c) + print(f"Mean pairwise target consistency error: {error * 1000:.2f} mm") + + save_calibration(R_cam2gripper, t_cam2gripper, args.output, args.method, len(R_g2b)) + + # Cleanup + pipeline.stop() + adapter.disconnect() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index e82cb656ce..40250709c5 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -21,6 +21,9 @@ "arm-teleop-piper": "dimos.teleop.quest.blueprints:arm_teleop_piper", "arm-teleop-visualizing": "dimos.teleop.quest.blueprints:arm_teleop_visualizing", "arm-teleop-xarm7": "dimos.teleop.quest.blueprints:arm_teleop_xarm7", + "aruco-servo-mock": "dimos.manipulation.dynamic_tracking.blueprints:aruco_servo_mock", + "aruco-servo-xarm6": "dimos.manipulation.dynamic_tracking.blueprints:aruco_servo_xarm6", + "aruco-tracker-realsense": "dimos.manipulation.dynamic_tracking.blueprints:aruco_tracker_realsense", "coordinator-basic": "dimos.control.blueprints:coordinator_basic", "coordinator-cartesian-ik-mock": "dimos.control.blueprints:coordinator_cartesian_ik_mock", "coordinator-cartesian-ik-piper": "dimos.control.blueprints:coordinator_cartesian_ik_piper", @@ -98,6 +101,7 @@ all_modules = { "agent": "dimos.agents.agent", "arm-teleop-module": "dimos.teleop.quest.quest_extensions", + "aruco-tracker": "dimos.manipulation.dynamic_tracking.aruco_tracker", "camera-module": "dimos.hardware.sensors.camera.module", "cartesian-motion-controller": "dimos.manipulation.control.servo_control.cartesian_motion_controller", "control-coordinator": "dimos.control.coordinator",