diff --git a/data/.lfs/openarm_description.tar.gz b/data/.lfs/openarm_description.tar.gz new file mode 100644 index 0000000000..b4cf5e04e9 --- /dev/null +++ b/data/.lfs/openarm_description.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e5dcbc94024986a46414b81486c3e88883847db75dd6ee90dc5fa6b88536b20f +size 70064686 diff --git a/dimos/hardware/manipulators/openarm/adapter.py b/dimos/hardware/manipulators/openarm/adapter.py new file mode 100644 index 0000000000..f68aec32ce --- /dev/null +++ b/dimos/hardware/manipulators/openarm/adapter.py @@ -0,0 +1,416 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenArm ManipulatorAdapter — wraps the Damiao MIT-mode driver. SI units.""" + +from __future__ import annotations + +from pathlib import Path +import time +from typing import TYPE_CHECKING, Any + +import numpy as np + +from dimos.hardware.manipulators.openarm.driver import ( + CTRL_MODE_MIT, + DamiaoMotor, + MotorType, + OpenArmBus, +) +from dimos.hardware.manipulators.spec import ( + ControlMode, + JointLimits, + ManipulatorInfo, +) +from dimos.utils.data import LfsPath + +if TYPE_CHECKING: + from dimos.hardware.manipulators.registry import AdapterRegistry + + +def _socketcan_iface_up(name: str) -> bool: + """Return True if SocketCAN interface is present and UP. Reads /sys directly.""" + try: + flags_path = Path("/sys/class/net") / name / "flags" + if not flags_path.exists(): + return False + return (int(flags_path.read_text().strip(), 16) & 0x1) == 0x1 + except OSError: + return False + + +# OpenArm v10 BOM — (send_id, MotorType) per joint, derived from the torque +# column of data/openarm_description/config/arm/v10/joint_limits.yaml. +_OPENARM_V10_ARM_MOTORS: list[tuple[int, MotorType]] = [ + (0x01, MotorType.DM8006), # joint1 + (0x02, MotorType.DM8006), # joint2 + (0x03, MotorType.DM4340), # joint3 + (0x04, MotorType.DM4340), # joint4 + (0x05, MotorType.DM4310), # joint5 + (0x06, MotorType.DM4310), # joint6 + (0x07, MotorType.DM4310), # joint7 +] +# Gripper (motor id 0x08, DM4310) is on the bus but not currently wired up +# through the adapter — see the gripper-write methods which return None/False. + +# Physical joint limits (measured). Joints 1 & 2 are mirrored between sides. +_V10_POS_LOWER_LEFT = [-3.45, -3.30, -1.50, -0.01, -1.50, -0.75, -1.50] +_V10_POS_UPPER_LEFT = [1.35, 0.15, 1.50, 2.40, 1.50, 0.75, 1.50] +_V10_POS_LOWER_RIGHT = [-1.35, -0.15, -1.50, -0.01, -1.50, -0.75, -1.50] +_V10_POS_UPPER_RIGHT = [3.45, 3.30, 1.50, 2.40, 1.50, 0.75, 1.50] +_V10_VEL_MAX = [16.754666, 16.754666, 5.445426, 5.445426, 20.943946, 20.943946, 20.943946] + +# Default MIT gains per joint for POSITION mode. +# kp range is [0, 500], kd range is [0, 5]. +# With gravity compensation enabled, the PD gains only handle transient +# tracking — they don't fight gravity. Lower kp = smoother, less buzz. +# High kd causes high-frequency buzz/grinding from the gearbox. +_DEFAULT_KP = [100.0, 100.0, 80.0, 80.0, 60.0, 60.0, 60.0] +_DEFAULT_KD = [1.5, 1.5, 1.0, 1.0, 0.8, 0.8, 0.8] + + +class OpenArmAdapter: + """Adapter for one OpenArm (7 DOF) on a single SocketCAN bus. + + Key kwargs: + address: SocketCAN channel, e.g. "can0" + side: "left" or "right" (picks URDF for gravity comp) + kp / kd: per-joint MIT gains (optional override) + gravity_comp: Pinocchio G(q) feedforward torque (default True) + auto_set_mit_mode: write CTRL_MODE=MIT on connect (idempotent, default True) + fd: CAN-FD (False for gs_usb adapters, which don't support FD) + interface: python-can backend; "virtual" for unit tests + """ + + # Per-side URDFs for Pinocchio gravity model (LFS-backed) + _URDF_LEFT = LfsPath("openarm_description/urdf/robot/openarm_v10_left.urdf") + _URDF_RIGHT = LfsPath("openarm_description/urdf/robot/openarm_v10_right.urdf") + + def __init__( + self, + address: str = "can0", + dof: int = 7, + *, + side: str = "left", + fd: bool = False, + interface: str = "socketcan", + kp: list[float] | None = None, + kd: list[float] | None = None, + gravity_comp: bool = True, + auto_set_mit_mode: bool = True, + **_: Any, + ) -> None: + if dof != 7: + raise ValueError(f"OpenArmAdapter only supports 7 DOF (got {dof})") + if side not in ("left", "right"): + raise ValueError(f"side must be 'left' or 'right', got {side!r}") + self._address = address + self._dof = dof + self._side = side + self._fd = fd + self._interface = interface + self._kp = list(kp) if kp is not None else list(_DEFAULT_KP) + self._kd = list(kd) if kd is not None else list(_DEFAULT_KD) + if len(self._kp) != dof or len(self._kd) != dof: + raise ValueError("kp/kd must be length 7") + self._gravity_comp = gravity_comp + self._auto_set_mit_mode = auto_set_mit_mode + + self._motors = [DamiaoMotor(sid, mt) for sid, mt in _OPENARM_V10_ARM_MOTORS] + self._bus: OpenArmBus | None = None + self._control_mode: ControlMode = ControlMode.POSITION + self._enabled: bool = False + # Last successful position command — used as q_target for VELOCITY mode + self._last_cmd_q: list[float] | None = None + + # Pinocchio model for gravity compensation (loaded lazily in connect()) + self._pin_model: Any = None + self._pin_data: Any = None + + def connect(self) -> bool: + # Preflight: verify the SocketCAN interface is up before opening the bus. + # Bringing the interface up requires root privileges, so we don't do it + # here — just fail early with a helpful message. + if self._interface == "socketcan" and not _socketcan_iface_up(self._address): + print( + f"ERROR: SocketCAN interface '{self._address}' is not UP.\n" + f" Run: sudo ip link set {self._address} up type can bitrate 1000000\n" + f" (or: sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh {self._address})" + ) + return False + + try: + self._bus = OpenArmBus( + channel=self._address, + motors=self._motors, + fd=self._fd, + interface=self._interface, + ) + self._bus.open() + except Exception as e: + print(f"ERROR: OpenArm {self._side}@{self._address} connect failed: {e}") + self._bus = None + return False + + # Ensure every motor is in MIT control mode. The write is idempotent + # (setting CTRL_MODE=MIT when it's already MIT is a no-op), so we + # write unconditionally rather than query-then-write. + if self._auto_set_mit_mode: + for m in self._motors: + self._bus.write_ctrl_mode(m.send_id, CTRL_MODE_MIT) + time.sleep(0.005) + else: + print( + f"OpenArm {self._side}@{self._address}: " + "auto_set_mit_mode disabled — relying on persisted register" + ) + + # Enable motors and wait for at least one state reply from each. + self._bus.enable_all() + self._enabled = True + if not self._bus.wait_all_states(timeout=0.5): + print( + f"WARNING: OpenArm {self._side}@{self._address}: not all motors " + "reported state within 0.5s — proceeding anyway" + ) + # Seed the position anchor from current hardware pose. + self._last_cmd_q = self.read_joint_positions() + + # Load Pinocchio model for gravity compensation + if self._gravity_comp: + try: + import pinocchio + + urdf = str(self._URDF_LEFT if self._side == "left" else self._URDF_RIGHT) + self._pin_model = pinocchio.buildModelFromUrdf(urdf) + self._pin_data = self._pin_model.createData() + print( + f"OpenArm {self._side}: gravity compensation enabled (nq={self._pin_model.nq})" + ) + except Exception as e: + print(f"WARNING: gravity comp disabled — {e}") + self._pin_model = None + self._pin_data = None + + return True + + def disconnect(self) -> None: + if self._bus is None: + return + try: + self._bus.disable_all() + except Exception: + pass + self._enabled = False + self._bus.close() + self._bus = None + + def is_connected(self) -> bool: + return self._bus is not None + + def get_info(self) -> ManipulatorInfo: + return ManipulatorInfo( + vendor="Enactic", + model=f"OpenArm v10 ({self._side})", + dof=self._dof, + firmware_version=None, + serial_number=None, + ) + + def get_dof(self) -> int: + return self._dof + + def get_limits(self) -> JointLimits: + if self._side == "left": + lower, upper = _V10_POS_LOWER_LEFT, _V10_POS_UPPER_LEFT + else: + lower, upper = _V10_POS_LOWER_RIGHT, _V10_POS_UPPER_RIGHT + return JointLimits( + position_lower=list(lower), + position_upper=list(upper), + velocity_max=list(_V10_VEL_MAX), + ) + + def set_control_mode(self, mode: ControlMode) -> bool: + # OpenArm runs exclusively in Damiao MIT register mode; we emulate + # dimos ControlModes by tuning kp/kd/q/dq/tau on each MIT frame. + # Cartesian/impedance control are outside this adapter's scope. + if mode in ( + ControlMode.POSITION, + ControlMode.SERVO_POSITION, + ControlMode.VELOCITY, + ControlMode.TORQUE, + ): + self._control_mode = mode + return True + return False + + def get_control_mode(self) -> ControlMode: + return self._control_mode + + def _states_or_raise(self) -> list[Any]: + if self._bus is None: + raise RuntimeError("OpenArmAdapter not connected") + return self._bus.get_states() + + def read_joint_positions(self) -> list[float]: + return [s.q if s is not None else 0.0 for s in self._states_or_raise()] + + def read_joint_velocities(self) -> list[float]: + return [s.dq if s is not None else 0.0 for s in self._states_or_raise()] + + def read_joint_efforts(self) -> list[float]: + return [s.tau if s is not None else 0.0 for s in self._states_or_raise()] + + def read_state(self) -> dict[str, int]: + if self._bus is None: + return {"state": 0, "mode": 0} + states = self._bus.get_states() + # report the hottest rotor temperature so callers can monitor thermal + # stress with a single scalar + t_rotor = max((s.t_rotor for s in states if s is not None), default=0) + return { + "state": 1 if self._enabled else 0, + "mode": 1, # MIT + "t_rotor_max": int(t_rotor), + } + + def read_error(self) -> tuple[int, str]: + # The Damiao motors don't report a structured error code in the state + # frame; over-temperature / over-torque are detected by the host from + # the normal state fields. Surface a soft thermal warning here. + if self._bus is None: + return 0, "" + states = self._bus.get_states() + t_rotor = max((s.t_rotor for s in states if s is not None), default=0) + if t_rotor >= 85: + return 1, f"rotor over-temperature ({t_rotor}°C)" + return 0, "" + + def _compute_gravity_torques(self, q: list[float]) -> list[float]: + """Pinocchio G(q), clamped to motor torque limits. Zero if model not loaded.""" + if self._pin_model is None or self._pin_data is None: + return [0.0] * self._dof + import pinocchio + + q_arr = np.array(q, dtype=np.float64) + tau_g = pinocchio.computeGeneralizedGravity(self._pin_model, self._pin_data, q_arr) + # Clamp to motor torque limits for safety + limits = [m.limits for m in self._motors] # (p_max, v_max, t_max) + return [float(np.clip(tau_g[i], -lim[2], lim[2])) for i, lim in enumerate(limits)] + + def write_joint_positions( + self, + positions: list[float], + velocity: float = 1.0, + ) -> bool: + if self._bus is None or not self._enabled: + return False + if len(positions) != self._dof: + return False + velocity = max(0.0, min(1.0, velocity)) + # Gravity feedforward: compute tau needed to hold the arm at the + # current configuration. The PD gains handle the rest. + q_current = self.read_joint_positions() + tau_ff = self._compute_gravity_torques(q_current) + commands = [ + (q, 0.0, kp * velocity, kd, tau) + for q, kp, kd, tau in zip(positions, self._kp, self._kd, tau_ff, strict=False) + ] + self._bus.send_mit_many(commands) + self._last_cmd_q = list(positions) + return True + + def write_joint_velocities(self, velocities: list[float]) -> bool: + # MIT velocity tracking: kp=0, send dq directly, anchor q at the + # last-commanded position so the motor doesn't drift. Gravity + # feedforward is still needed — with kp=0 the only restoring force + # is damping, so without tau_ff the arm droops under its own weight. + if self._bus is None or not self._enabled: + return False + if len(velocities) != self._dof: + return False + if self._last_cmd_q is None: + self._last_cmd_q = self.read_joint_positions() + anchor = self._last_cmd_q + q_current = self.read_joint_positions() + tau_ff = self._compute_gravity_torques(q_current) + commands = [ + (q_anchor, dq, 0.0, kd, tau) + for q_anchor, dq, kd, tau in zip(anchor, velocities, self._kd, tau_ff, strict=False) + ] + self._bus.send_mit_many(commands) + return True + + def write_stop(self) -> bool: + if self._bus is None: + return False + try: + q_now = self.read_joint_positions() + except Exception: + q_now = [0.0] * self._dof + tau_ff = self._compute_gravity_torques(q_now) + commands = [ + (q, 0.0, kp, kd, tau) + for q, kp, kd, tau in zip(q_now, self._kp, self._kd, tau_ff, strict=False) + ] + self._bus.send_mit_many(commands) + self._last_cmd_q = q_now + return True + + def write_enable(self, enable: bool) -> bool: + if self._bus is None: + return False + if enable: + self._bus.enable_all() + else: + self._bus.disable_all() + self._enabled = enable + return True + + def read_enabled(self) -> bool: + return self._enabled + + def write_clear_errors(self) -> bool: + # Damiao motors have no separate clear-error command; re-enabling + # after a fault is the recovery path. + if self._bus is None: + return False + self._bus.disable_all() + self._bus.enable_all() + self._enabled = True + return True + + def read_cartesian_position(self) -> dict[str, float] | None: + return None + + def write_cartesian_position(self, pose: dict[str, float], velocity: float = 1.0) -> bool: + return False + + def read_gripper_position(self) -> float | None: + return None + + def write_gripper_position(self, position: float) -> bool: + return False + + def read_force_torque(self) -> list[float] | None: + return None + + +# ── Registry hook (required for auto-discovery) ─────────────────── +def register(registry: AdapterRegistry) -> None: + registry.register("openarm", OpenArmAdapter) + + +__all__ = ["OpenArmAdapter", "register"] diff --git a/dimos/hardware/manipulators/openarm/driver.py b/dimos/hardware/manipulators/openarm/driver.py new file mode 100644 index 0000000000..5408187647 --- /dev/null +++ b/dimos/hardware/manipulators/openarm/driver.py @@ -0,0 +1,398 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Damiao MIT-mode CAN driver for OpenArm. SI units throughout. + +Ported from ``enactic/openarm_can`` (C++). No dimos deps — testable with +``can.Bus(interface="virtual")``. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import enum +import errno +import struct +import threading +import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import can + + +class MotorType(str, enum.Enum): + """Damiao motor types used on OpenArm. Values match the reference library.""" + + DM3507 = "DM3507" + DM4310 = "DM4310" + DM4310_48V = "DM4310_48V" + DM4340 = "DM4340" + DM4340_48V = "DM4340_48V" + DM6006 = "DM6006" + DM8006 = "DM8006" + DM8009 = "DM8009" + DM10010L = "DM10010L" + DM10010 = "DM10010" + DMH3510 = "DMH3510" + DMH6215 = "DMH6215" + DMG6220 = "DMG6220" + + +# (p_max [rad], v_max [rad/s], t_max [Nm]) +_MOTOR_LIMITS: dict[MotorType, tuple[float, float, float]] = { + MotorType.DM3507: (12.5, 50.0, 5.0), + MotorType.DM4310: (12.5, 30.0, 10.0), + MotorType.DM4310_48V: (12.5, 50.0, 10.0), + MotorType.DM4340: (12.5, 8.0, 28.0), + MotorType.DM4340_48V: (12.5, 10.0, 28.0), + MotorType.DM6006: (12.5, 45.0, 20.0), + MotorType.DM8006: (12.5, 45.0, 40.0), + MotorType.DM8009: (12.5, 45.0, 54.0), + MotorType.DM10010L: (12.5, 25.0, 200.0), + MotorType.DM10010: (12.5, 20.0, 200.0), + MotorType.DMH3510: (12.5, 280.0, 1.0), + MotorType.DMH6215: (12.5, 45.0, 10.0), + MotorType.DMG6220: (12.5, 45.0, 10.0), +} + +# MIT gain ranges (protocol-fixed, same for every motor type) +KP_MIN, KP_MAX = 0.0, 500.0 +KD_MIN, KD_MAX = 0.0, 5.0 + +# Broadcast/control CAN IDs +_BROADCAST_ID = 0x7FF +_CMD_ENABLE = 0xFC +_CMD_DISABLE = 0xFD +_CMD_SET_ZERO = 0xFE +_RID_CTRL_MODE = 10 +CTRL_MODE_MIT = 1 + + +def _clamp(x: float, lo: float, hi: float) -> float: + if x < lo: + return lo + if x > hi: + return hi + return x + + +def float_to_uint(x: float, lo: float, hi: float, bits: int) -> int: + x = _clamp(x, lo, hi) + return int((x - lo) / (hi - lo) * ((1 << bits) - 1)) + + +def uint_to_float(u: int, lo: float, hi: float, bits: int) -> float: + return u / ((1 << bits) - 1) * (hi - lo) + lo + + +def pack_mit_frame( + motor_type: MotorType, + q: float, + dq: float, + kp: float, + kd: float, + tau: float, +) -> bytes: + p_max, v_max, t_max = _MOTOR_LIMITS[motor_type] + q_u = float_to_uint(q, -p_max, p_max, 16) + dq_u = float_to_uint(dq, -v_max, v_max, 12) + kp_u = float_to_uint(kp, KP_MIN, KP_MAX, 12) + kd_u = float_to_uint(kd, KD_MIN, KD_MAX, 12) + tau_u = float_to_uint(tau, -t_max, t_max, 12) + return bytes( + [ + (q_u >> 8) & 0xFF, + q_u & 0xFF, + (dq_u >> 4) & 0xFF, + ((dq_u & 0xF) << 4) | ((kp_u >> 8) & 0xF), + kp_u & 0xFF, + (kd_u >> 4) & 0xFF, + ((kd_u & 0xF) << 4) | ((tau_u >> 8) & 0xF), + tau_u & 0xFF, + ] + ) + + +@dataclass(frozen=True) +class MotorState: + """Decoded state from a Damiao reply frame.""" + + q: float # rad + dq: float # rad/s + tau: float # Nm + t_mos: int # °C + t_rotor: int # °C + timestamp: float # monotonic seconds when received + + +def parse_state_frame(motor_type: MotorType, data: bytes) -> MotorState | None: + """Decode an 8-byte Damiao state reply. Returns None if too short.""" + if len(data) < 8: + return None + p_max, v_max, t_max = _MOTOR_LIMITS[motor_type] + q_u = (data[1] << 8) | data[2] + dq_u = (data[3] << 4) | (data[4] >> 4) + tau_u = ((data[4] & 0x0F) << 8) | data[5] + return MotorState( + q=uint_to_float(q_u, -p_max, p_max, 16), + dq=uint_to_float(dq_u, -v_max, v_max, 12), + tau=uint_to_float(tau_u, -t_max, t_max, 12), + t_mos=int(data[6]), + t_rotor=int(data[7]), + timestamp=time.monotonic(), + ) + + +def _pack_control_command(cmd: int) -> bytes: + return bytes([0xFF] * 7 + [cmd & 0xFF]) + + +def pack_write_param_frame(send_id: int, rid: int, value_u32: int) -> bytes: + """Broadcast parameter-write frame sent to CAN id 0x7FF.""" + val = struct.pack("> 8) & 0xFF, + 0x55, + rid & 0xFF, + val[0], + val[1], + val[2], + val[3], + ] + ) + + +@dataclass(frozen=True) +class DamiaoMotor: + """One Damiao motor on a CAN bus. recv_id defaults to send_id | 0x10.""" + + send_id: int + motor_type: MotorType + recv_id: int | None = None + + @property + def effective_recv_id(self) -> int: + return self.recv_id if self.recv_id is not None else (self.send_id | 0x10) + + @property + def limits(self) -> tuple[float, float, float]: + return _MOTOR_LIMITS[self.motor_type] + + +class OpenArmBus: + """One SocketCAN bus with a background RX thread caching latest state.""" + + def __init__( + self, + channel: str, + motors: list[DamiaoMotor], + *, + fd: bool = False, + interface: str = "socketcan", + ) -> None: + if not motors: + raise ValueError("OpenArmBus needs at least one motor") + # Enforce unique IDs — silent overlap would make state routing ambiguous. + send_ids = [m.send_id for m in motors] + if len(set(send_ids)) != len(send_ids): + raise ValueError(f"duplicate send_id in {send_ids}") + recv_ids = [m.effective_recv_id for m in motors] + if len(set(recv_ids)) != len(recv_ids): + raise ValueError(f"duplicate recv_id in {recv_ids}") + + self._channel = channel + self._motors = list(motors) + self._fd = fd + self._interface = interface + self._by_recv: dict[int, DamiaoMotor] = {m.effective_recv_id: m for m in motors} + + self._bus: can.BusABC | None = None + self._rx_thread: threading.Thread | None = None + self._rx_stop = threading.Event() + self._state_lock = threading.Lock() + self._states: dict[int, MotorState] = {} + + def open(self) -> None: + """Open the CAN bus and start the background RX thread.""" + if self._bus is not None: + return + import can # local import — python-can is optional + + self._bus = can.Bus(interface=self._interface, channel=self._channel, fd=self._fd) + self._rx_stop.clear() + self._rx_thread = threading.Thread( + target=self._rx_loop, name=f"openarm-rx-{self._channel}", daemon=True + ) + self._rx_thread.start() + + def close(self) -> None: + """Stop the RX thread and close the CAN bus.""" + self._rx_stop.set() + if self._rx_thread is not None: + self._rx_thread.join(timeout=1.0) + self._rx_thread = None + if self._bus is not None: + try: + self._bus.shutdown() + finally: + self._bus = None + + def __enter__(self) -> OpenArmBus: + self.open() + return self + + def __exit__(self, *_exc: object) -> None: + self.close() + + def enable_all(self) -> None: + for m in self._motors: + self._send_raw(m.send_id, _pack_control_command(_CMD_ENABLE)) + time.sleep(0.002) # 2ms inter-frame gap — gs_usb TX buffer is tiny + + def disable_all(self) -> None: + for m in self._motors: + self._send_raw(m.send_id, _pack_control_command(_CMD_DISABLE)) + time.sleep(0.002) + + def set_zero(self, send_id: int) -> None: + """Set current physical position as the motor's zero. Destructive.""" + self._send_raw(send_id, _pack_control_command(_CMD_SET_ZERO)) + + def write_ctrl_mode(self, send_id: int, mode: int = CTRL_MODE_MIT) -> None: + self._send_raw( + _BROADCAST_ID, + pack_write_param_frame(send_id, _RID_CTRL_MODE, mode), + ) + + def send_mit( + self, + send_id: int, + q: float, + dq: float, + kp: float, + kd: float, + tau: float, + ) -> None: + motor = next((m for m in self._motors if m.send_id == send_id), None) + if motor is None: + raise KeyError(f"motor 0x{send_id:02X} not on bus {self._channel}") + data = pack_mit_frame(motor.motor_type, q, dq, kp, kd, tau) + self._send_raw(send_id, data) + + def send_mit_many( + self, + commands: list[tuple[float, float, float, float, float]], + ) -> None: + """One MIT frame per motor; commands[i] → self.motors[i] = (q, dq, kp, kd, tau).""" + if len(commands) != len(self._motors): + raise ValueError(f"expected {len(self._motors)} commands, got {len(commands)}") + for i, (motor, cmd) in enumerate(zip(self._motors, commands, strict=False)): + q, dq, kp, kd, tau = cmd + data = pack_mit_frame(motor.motor_type, q, dq, kp, kd, tau) + self._send_raw(motor.send_id, data) + # Tiny inter-frame gap to avoid TX buffer overflow on gs_usb. + # 7 frames × 0.5ms = 3.5ms total, well within a 10ms tick. + if i < len(self._motors) - 1: + time.sleep(0.0005) + + @property + def motors(self) -> tuple[DamiaoMotor, ...]: + return tuple(self._motors) + + def get_state(self, send_id: int) -> MotorState | None: + motor = next((m for m in self._motors if m.send_id == send_id), None) + if motor is None: + return None + with self._state_lock: + return self._states.get(motor.effective_recv_id) + + def get_states(self) -> list[MotorState | None]: + with self._state_lock: + return [self._states.get(m.effective_recv_id) for m in self._motors] + + def wait_all_states(self, timeout: float = 0.5) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with self._state_lock: + if all(m.effective_recv_id in self._states for m in self._motors): + return True + time.sleep(0.005) + return False + + def _send_raw(self, arbitration_id: int, data: bytes) -> None: + if self._bus is None: + raise RuntimeError("bus not open — call .open() first") + import can + + msg = can.Message( + arbitration_id=arbitration_id, + data=data, + is_extended_id=False, + is_fd=self._fd, + bitrate_switch=self._fd, + ) + # Retry on TX buffer full (ENOBUFS) — the gs_usb adapter has a + # tiny kernel-side TX queue. A short backoff lets the kernel drain + # one frame before we try again. Check errno on the underlying + # cause rather than string-matching the python-can message. + for attempt in range(4): + try: + self._bus.send(msg) + return + except can.CanOperationError as e: + cause = e.__cause__ or e + is_enobufs = ( + getattr(cause, "errno", None) == errno.ENOBUFS + or "105" in str(e) # fallback if errno not exposed + ) + if is_enobufs and attempt < 3: + time.sleep(0.001 * (attempt + 1)) + else: + raise + + def _rx_loop(self) -> None: + assert self._bus is not None + while not self._rx_stop.is_set(): + msg = self._bus.recv(timeout=0.05) + if msg is None: + continue + motor = self._by_recv.get(int(msg.arbitration_id)) + if motor is None: + continue + state = parse_state_frame(motor.motor_type, bytes(msg.data)) + if state is None: + continue + with self._state_lock: + self._states[motor.effective_recv_id] = state + + +__all__ = [ + "CTRL_MODE_MIT", + "KD_MAX", + "KD_MIN", + "KP_MAX", + "KP_MIN", + "DamiaoMotor", + "MotorState", + "MotorType", + "OpenArmBus", + "float_to_uint", + "pack_mit_frame", + "pack_write_param_frame", + "parse_state_frame", + "uint_to_float", +] diff --git a/dimos/hardware/manipulators/openarm/test_driver.py b/dimos/hardware/manipulators/openarm/test_driver.py new file mode 100644 index 0000000000..c65a972bd6 --- /dev/null +++ b/dimos/hardware/manipulators/openarm/test_driver.py @@ -0,0 +1,270 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the Damiao MIT-mode driver — no hardware required. + +Uses ``can.Bus(interface="virtual")`` for loopback. +""" + +from __future__ import annotations + +import struct +import time + +import pytest + +can = pytest.importorskip("can") + +from dimos.hardware.manipulators.openarm.driver import ( + CTRL_MODE_MIT, + KD_MAX, + KP_MAX, + DamiaoMotor, + MotorType, + OpenArmBus, + float_to_uint, + pack_mit_frame, + pack_write_param_frame, + parse_state_frame, + uint_to_float, +) + + +def test_float_to_uint_endpoints_and_roundtrip() -> None: + # Endpoints + assert float_to_uint(-12.5, -12.5, 12.5, 16) == 0 + assert float_to_uint(12.5, -12.5, 12.5, 16) == (1 << 16) - 1 + # Midpoint is half the full range (rounded down) + mid = float_to_uint(0.0, -12.5, 12.5, 16) + assert mid in ((1 << 16) // 2 - 1, (1 << 16) // 2) + # Out-of-range clamps + assert float_to_uint(-100.0, -12.5, 12.5, 16) == 0 + assert float_to_uint(100.0, -12.5, 12.5, 16) == (1 << 16) - 1 + + +def test_roundtrip_all_gain_ranges() -> None: + # Quantization error should be tiny + for bits, lo, hi in [(16, -12.5, 12.5), (12, 0.0, KP_MAX), (12, 0.0, KD_MAX)]: + step = (hi - lo) / ((1 << bits) - 1) + for k in range(0, 1 << bits, max(1, (1 << bits) // 50)): + x = lo + k * step + u = float_to_uint(x, lo, hi, bits) + x2 = uint_to_float(u, lo, hi, bits) + assert abs(x - x2) <= step + + +def test_mit_frame_kp_kd_zero_and_pos_zero() -> None: + # q=dq=kp=kd=tau=0 → q_u = 32767 (16-bit midpoint), dq_u = 2047 (12-bit), + # tau_u = 2047. kp_u = kd_u = 0 (min of their 0-positive range). + data = pack_mit_frame(MotorType.DM4310, 0.0, 0.0, 0.0, 0.0, 0.0) + assert len(data) == 8 + # Reconstruct fields from bytes + q_u = (data[0] << 8) | data[1] + dq_u = (data[2] << 4) | (data[3] >> 4) + kp_u = ((data[3] & 0xF) << 8) | data[4] + kd_u = (data[5] << 4) | (data[6] >> 4) + tau_u = ((data[6] & 0xF) << 8) | data[7] + assert kp_u == 0 + assert kd_u == 0 + # 16-bit midpoint of symmetric range + assert q_u in (32767, 32768) + assert dq_u in (2047, 2048) + assert tau_u in (2047, 2048) + + +def test_mit_frame_full_positive() -> None: + # Command at every max → every _u field saturates. + data = pack_mit_frame(MotorType.DM4310, 12.5, 30.0, 500.0, 5.0, 10.0) + q_u = (data[0] << 8) | data[1] + dq_u = (data[2] << 4) | (data[3] >> 4) + kp_u = ((data[3] & 0xF) << 8) | data[4] + kd_u = (data[5] << 4) | (data[6] >> 4) + tau_u = ((data[6] & 0xF) << 8) | data[7] + assert q_u == 0xFFFF + assert dq_u == 0xFFF + assert kp_u == 0xFFF + assert kd_u == 0xFFF + assert tau_u == 0xFFF + + +def test_parse_state_roundtrip() -> None: + # Build a synthetic reply frame with known values and verify decode. + # Byte layout for state: [echo, q_hi, q_lo, dq_hi, dq_lo|tau_hi, tau_lo, t_mos, t_rotor] + motor = MotorType.DM4340 + p_max, v_max, t_max = 12.5, 8.0, 28.0 + q_u = float_to_uint(0.3, -p_max, p_max, 16) + dq_u = float_to_uint(-1.0, -v_max, v_max, 12) + tau_u = float_to_uint(2.0, -t_max, t_max, 12) + data = bytes( + [ + 0x03, + (q_u >> 8) & 0xFF, + q_u & 0xFF, + (dq_u >> 4) & 0xFF, + ((dq_u & 0xF) << 4) | ((tau_u >> 8) & 0xF), + tau_u & 0xFF, + 33, + 28, + ] + ) + state = parse_state_frame(motor, data) + assert state is not None + assert abs(state.q - 0.3) < 0.001 + assert abs(state.dq - (-1.0)) < 0.01 + assert abs(state.tau - 2.0) < 0.02 + assert state.t_mos == 33 + assert state.t_rotor == 28 + + +def test_parse_state_rejects_short_frames() -> None: + assert parse_state_frame(MotorType.DM4310, b"\x00" * 4) is None + + +def test_pack_write_param_ctrl_mode_mit() -> None: + data = pack_write_param_frame(0x05, 10, CTRL_MODE_MIT) + assert data[0] == 0x05 + assert data[1] == 0x00 + assert data[2] == 0x55 + assert data[3] == 10 + assert struct.unpack(" OpenArmBus: + return OpenArmBus(channel=channel, motors=motors, fd=False, interface="virtual") + + +def test_bus_validates_unique_ids() -> None: + with pytest.raises(ValueError, match="duplicate send_id"): + OpenArmBus( + channel="v0", + motors=[ + DamiaoMotor(0x01, MotorType.DM4310), + DamiaoMotor(0x01, MotorType.DM4310), + ], + fd=False, + interface="virtual", + ) + + +def test_bus_empty_motor_list_rejected() -> None: + with pytest.raises(ValueError): + OpenArmBus(channel="v0", motors=[], fd=False, interface="virtual") + + +def test_rx_thread_populates_state_cache() -> None: + # Two peers on the same virtual channel loop back to each other. + motors = [ + DamiaoMotor(0x01, MotorType.DM8006), + DamiaoMotor(0x05, MotorType.DM4310), + ] + bus = _make_bus("openarm-test-rx", motors) + # A raw sender on the same virtual channel injects state replies. + sender = can.Bus(interface="virtual", channel="openarm-test-rx") + try: + bus.open() + # Forge a reply for motor 0x01 (recv 0x11) at q = 0.25 rad + q_u = float_to_uint(0.25, -12.5, 12.5, 16) + dq_u = float_to_uint(0.0, -45.0, 45.0, 12) + tau_u = float_to_uint(0.0, -40.0, 40.0, 12) + payload = bytes( + [ + 0x01, + (q_u >> 8) & 0xFF, + q_u & 0xFF, + (dq_u >> 4) & 0xFF, + ((dq_u & 0xF) << 4) | ((tau_u >> 8) & 0xF), + tau_u & 0xFF, + 30, + 28, + ] + ) + sender.send(can.Message(arbitration_id=0x11, data=payload, is_extended_id=False)) + # Poll briefly for the RX thread to consume it + deadline = time.monotonic() + 0.5 + s = None + while s is None and time.monotonic() < deadline: + s = bus.get_state(0x01) + time.sleep(0.01) + assert s is not None, "RX thread did not pick up synthetic state reply" + assert abs(s.q - 0.25) < 0.001 + # Motor 0x05 never got a reply → state should still be None + assert bus.get_state(0x05) is None + finally: + bus.close() + sender.shutdown() + + +def test_send_mit_many_fans_out_one_per_motor() -> None: + motors = [ + DamiaoMotor(0x01, MotorType.DM8006), + DamiaoMotor(0x02, MotorType.DM8006), + DamiaoMotor(0x05, MotorType.DM4310), + ] + bus = _make_bus("openarm-test-send", motors) + listener = can.Bus(interface="virtual", channel="openarm-test-send") + try: + bus.open() + bus.send_mit_many( + [ + (0.1, 0.0, 10.0, 0.5, 0.0), + (0.2, 0.0, 10.0, 0.5, 0.0), + (0.3, 0.0, 10.0, 0.5, 0.0), + ] + ) + seen_ids: set[int] = set() + deadline = time.monotonic() + 0.5 + while len(seen_ids) < 3 and time.monotonic() < deadline: + msg = listener.recv(timeout=0.1) + if msg is not None: + seen_ids.add(int(msg.arbitration_id)) + assert seen_ids == {0x01, 0x02, 0x05} + finally: + bus.close() + listener.shutdown() + + +def test_send_mit_many_size_mismatch() -> None: + bus = _make_bus( + "openarm-test-mismatch", + [DamiaoMotor(0x01, MotorType.DM4310), DamiaoMotor(0x02, MotorType.DM4310)], + ) + try: + bus.open() + with pytest.raises(ValueError): + bus.send_mit_many([(0.0, 0.0, 0.0, 0.0, 0.0)]) + finally: + bus.close() + + +def test_enable_disable_frames_sent() -> None: + bus = _make_bus( + "openarm-test-enable", + [DamiaoMotor(0x01, MotorType.DM4310), DamiaoMotor(0x05, MotorType.DM4310)], + ) + listener = can.Bus(interface="virtual", channel="openarm-test-enable") + try: + bus.open() + bus.enable_all() + seen = {} + deadline = time.monotonic() + 0.3 + while len(seen) < 2 and time.monotonic() < deadline: + msg = listener.recv(timeout=0.1) + if msg is not None: + seen[int(msg.arbitration_id)] = bytes(msg.data) + assert set(seen) == {0x01, 0x05} + for data in seen.values(): + assert data == bytes([0xFF] * 7 + [0xFC]) + finally: + bus.close() + listener.shutdown() diff --git a/dimos/perception/detection/type/imageDetections.py b/dimos/perception/detection/type/imageDetections.py index 1eea0c9c3c..e3212a7804 100644 --- a/dimos/perception/detection/type/imageDetections.py +++ b/dimos/perception/detection/type/imageDetections.py @@ -19,6 +19,11 @@ import sys from typing import TYPE_CHECKING, Generic, TypeVar +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + if sys.version_info >= (3, 11): from typing import Self else: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 96ce1c6784..c794a67124 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -25,6 +25,10 @@ "coordinator-mobile-manip-mock": "dimos.control.blueprints.mobile:coordinator_mobile_manip_mock", "coordinator-mock": "dimos.control.blueprints.basic:coordinator_mock", "coordinator-mock-twist-base": "dimos.control.blueprints.mobile:coordinator_mock_twist_base", + "coordinator-openarm-bimanual": "dimos.robot.manipulators.openarm.blueprints:coordinator_openarm_bimanual", + "coordinator-openarm-left": "dimos.robot.manipulators.openarm.blueprints:coordinator_openarm_left", + "coordinator-openarm-mock": "dimos.robot.manipulators.openarm.blueprints:coordinator_openarm_mock", + "coordinator-openarm-right": "dimos.robot.manipulators.openarm.blueprints:coordinator_openarm_right", "coordinator-piper": "dimos.control.blueprints.basic:coordinator_piper", "coordinator-piper-xarm": "dimos.control.blueprints.dual:coordinator_piper_xarm", "coordinator-servo-xarm6": "dimos.control.blueprints.teleop:coordinator_servo_xarm6", @@ -49,6 +53,8 @@ "drone-agentic": "dimos.robot.drone.blueprints.agentic.drone_agentic:drone_agentic", "drone-basic": "dimos.robot.drone.blueprints.basic.drone_basic:drone_basic", "dual-xarm6-planner": "dimos.manipulation.blueprints:dual_xarm6_planner", + "keyboard-teleop-openarm": "dimos.robot.manipulators.openarm.blueprints:keyboard_teleop_openarm", + "keyboard-teleop-openarm-mock": "dimos.robot.manipulators.openarm.blueprints:keyboard_teleop_openarm_mock", "keyboard-teleop-piper": "dimos.robot.manipulators.piper.blueprints:keyboard_teleop_piper", "keyboard-teleop-xarm6": "dimos.robot.manipulators.xarm.blueprints:keyboard_teleop_xarm6", "keyboard-teleop-xarm7": "dimos.robot.manipulators.xarm.blueprints:keyboard_teleop_xarm7", @@ -56,6 +62,8 @@ "mid360-fastlio": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio", "mid360-fastlio-voxels": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio_voxels", "mid360-fastlio-voxels-native": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio_voxels_native", + "openarm-mock-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_mock_planner_coordinator", + "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", "teleop-phone-go2-fleet": "dimos.teleop.phone.blueprints:teleop_phone_go2_fleet", diff --git a/dimos/robot/catalog/openarm.py b/dimos/robot/catalog/openarm.py new file mode 100644 index 0000000000..b6e1238cf2 --- /dev/null +++ b/dimos/robot/catalog/openarm.py @@ -0,0 +1,120 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenArm v10 robot configurations.""" + +from __future__ import annotations + +from typing import Any + +from dimos.robot.config import RobotConfig +from dimos.utils.data import LfsPath + +# Collision exclusion pairs — structural mesh overlaps in the OpenArm URDF. +# link5 and link7 collision meshes overlap by ~3mm at zero pose (and every +# other pose) — same pattern as R1 Pro's non-adjacent link overlap. +OPENARM_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("openarm_left_link5", "openarm_left_link7"), + ("openarm_right_link5", "openarm_right_link7"), +] + +# LFS-backed: data/.lfs/openarm_description.tar.gz extracts to data/openarm_description/ +_OPENARM_PKG = LfsPath("openarm_description") +_OPENARM_MODEL_PATH = _OPENARM_PKG / "urdf/robot/openarm_v10_bimanual.urdf" +# Per-side URDFs: extracted from bimanual expansion, only one arm + torso each. +# Avoids phantom-arm collisions when Drake loads both sides into one world. +_OPENARM_LEFT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_left.urdf" +_OPENARM_RIGHT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_right.urdf" + +# Pre-expanded single-arm URDF for Pinocchio FK (keyboard teleop, IK, etc.) +OPENARM_V10_FK_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_single.urdf" + + +def openarm_arm( + side: str = "left", + name: str | None = None, + *, + adapter_type: str = "mock", + address: str | None = None, + **overrides: Any, +) -> RobotConfig: + """OpenArm v10 config for one side. Uses per-side URDF (arm + torso only).""" + if side not in ("left", "right"): + raise ValueError(f"side must be 'left' or 'right', got {side!r}") + + resolved_name = name or f"{side}_arm" + # Pre-expanded bimanual URDF uses openarm_{side}_* naming. + joint_names = [f"openarm_{side}_joint{i}" for i in range(1, 8)] + ee_link = f"openarm_{side}_link7" + + defaults: dict[str, Any] = { + "name": resolved_name, + "model_path": _OPENARM_LEFT_MODEL if side == "left" else _OPENARM_RIGHT_MODEL, + "end_effector_link": ee_link, + "adapter_type": adapter_type, + "address": address, + "joint_names": joint_names, + # URDF already prefixes joints with "left_"/"right_" in bimanual mode, + # so suppress RobotConfig's automatic "{name}_" prefix. + "joint_prefix": "", + "base_link": "openarm_body_link0", + "home_joints": [0.0] * 7, + "base_pose": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0], + "package_paths": {"openarm_description": _OPENARM_PKG}, + "collision_exclusion_pairs": OPENARM_COLLISION_EXCLUSIONS, + "auto_convert_meshes": True, + "max_velocity": 0.5, + "max_acceleration": 1.0, + "adapter_kwargs": {"side": side}, + } + # Merge adapter_kwargs rather than replace, so callers can add keys + # (e.g. auto_set_mit_mode) without clobbering the catalog's "side". + if "adapter_kwargs" in overrides: + defaults["adapter_kwargs"] = { + **defaults["adapter_kwargs"], + **overrides.pop("adapter_kwargs"), + } + defaults.update(overrides) + return RobotConfig(**defaults) + + +def openarm_single( + name: str = "arm", + *, + adapter_type: str = "mock", + address: str | None = None, + **overrides: Any, +) -> RobotConfig: + """Single-arm config (keyboard teleop, cartesian IK). Use openarm_arm() for bimanual.""" + defaults: dict[str, Any] = { + "name": name, + "model_path": OPENARM_V10_FK_MODEL, + "end_effector_link": "openarm_left_link7", + "adapter_type": adapter_type, + "address": address, + "joint_names": [f"openarm_left_joint{i}" for i in range(1, 8)], + "joint_prefix": "", + "base_link": "openarm_body_link0", + "home_joints": [0.0] * 7, + "package_paths": {"openarm_description": _OPENARM_PKG}, + "auto_convert_meshes": True, + "max_velocity": 0.5, + "max_acceleration": 1.0, + "adapter_kwargs": {"side": "left"}, + } + defaults.update(overrides) + return RobotConfig(**defaults) + + +__all__ = ["OPENARM_V10_FK_MODEL", "openarm_arm", "openarm_single"] diff --git a/dimos/robot/manipulators/openarm/blueprints.py b/dimos/robot/manipulators/openarm/blueprints.py new file mode 100644 index 0000000000..d0075677d9 --- /dev/null +++ b/dimos/robot/manipulators/openarm/blueprints.py @@ -0,0 +1,232 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenArm blueprints. Flip LEFT_CAN / RIGHT_CAN below if arms come up swapped.""" + +from __future__ import annotations + +from dimos.control.coordinator import ControlCoordinator +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.transport import LCMTransport +from dimos.manipulation.manipulation_module import ManipulationModule +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.robot.catalog.openarm import ( + OPENARM_V10_FK_MODEL, + openarm_arm as _openarm, + openarm_single as _openarm_single, +) +from dimos.teleop.keyboard.keyboard_teleop_module import KeyboardTeleopModule + +# ── Mock bimanual: no hardware, great for verifying wiring ───────────── +_mock_left = _openarm(side="left") +_mock_right = _openarm(side="right") + +coordinator_openarm_mock = ControlCoordinator.blueprint( + hardware=[_mock_left.to_hardware_component(), _mock_right.to_hardware_component()], + tasks=[ + _mock_left.to_task_config(), + _mock_right.to_task_config(), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + +# ── Single-arm hardware blueprints (first real bring-up targets) ─────── +# CAN interface each physical arm is on. Linux assigns can0/can1 in USB +# enumeration order which isn't guaranteed stable — if the arms come up +# swapped, flip these two values. +LEFT_CAN = "can1" +RIGHT_CAN = "can0" + +# Flip to False to skip the CTRL_MODE=MIT write at connect-time — useful for +# verifying the setting persists across power cycles. Leave True for normal +# operation (idempotent; ensures motors work even if they were reflashed / +# replaced / factory-reset). +AUTO_SET_MIT_MODE = True + +_ADAPTER_KWARGS = {"auto_set_mit_mode": AUTO_SET_MIT_MODE} +_left_hw = _openarm( + side="left", + address=LEFT_CAN, + adapter_type="openarm", + adapter_kwargs=_ADAPTER_KWARGS, +) +_right_hw = _openarm( + side="right", + address=RIGHT_CAN, + adapter_type="openarm", + adapter_kwargs=_ADAPTER_KWARGS, +) + +coordinator_openarm_left = ControlCoordinator.blueprint( + hardware=[_left_hw.to_hardware_component()], + tasks=[_left_hw.to_task_config()], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + +coordinator_openarm_right = ControlCoordinator.blueprint( + hardware=[_right_hw.to_hardware_component()], + tasks=[_right_hw.to_task_config()], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + +# ── Bimanual hardware blueprint ──────────────────────────────────────── +coordinator_openarm_bimanual = ControlCoordinator.blueprint( + hardware=[_left_hw.to_hardware_component(), _right_hw.to_hardware_component()], + tasks=[ + _left_hw.to_task_config(), + _right_hw.to_task_config(), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + + +# ── Planner + coordinator (mock): Drake plans, mock adapters execute ──── +# Great for visualizing motions in Meshcat with no hardware. +openarm_mock_planner_coordinator = autoconnect( + ManipulationModule.blueprint( + robots=[_mock_left.to_robot_model_config(), _mock_right.to_robot_model_config()], + planning_timeout=10.0, + enable_viz=True, + ), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[_mock_left.to_hardware_component(), _mock_right.to_hardware_component()], + tasks=[ + _mock_left.to_task_config(), + _mock_right.to_task_config(), + ], + ), +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + +# ── Planner + coordinator (real hw): plan & execute on both arms ──────── +openarm_planner_coordinator = autoconnect( + ManipulationModule.blueprint( + robots=[_left_hw.to_robot_model_config(), _right_hw.to_robot_model_config()], + planning_timeout=10.0, + enable_viz=True, + ), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[_left_hw.to_hardware_component(), _right_hw.to_hardware_component()], + tasks=[ + _left_hw.to_task_config(), + _right_hw.to_task_config(), + ], + ), +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + + +# ── Keyboard teleop (single arm, mock) ────────────────────────────────── +# pygame keyboard UI → Cartesian IK (Drake) → mock coordinator execution, +# with Drake/Meshcat visualization. Good for testing the single-arm URDF +# and IK without touching hardware. +_teleop_cfg = _openarm_single(name="arm") + +keyboard_teleop_openarm_mock = autoconnect( + KeyboardTeleopModule.blueprint(model_path=OPENARM_V10_FK_MODEL, ee_joint_id=_teleop_cfg.dof), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[_teleop_cfg.to_hardware_component()], + tasks=[ + _teleop_cfg.to_task_config( + task_type="cartesian_ik", + task_name="cartesian_ik_arm", + model_path=OPENARM_V10_FK_MODEL, + ee_joint_id=_teleop_cfg.dof, + ), + ], + ), + ManipulationModule.blueprint( + robots=[_teleop_cfg.to_robot_model_config()], + enable_viz=True, + ), +).transports( + { + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + +# ── Keyboard teleop (single arm, real hw on can0) ─────────────────────── +_teleop_hw_cfg = _openarm_single(name="arm", adapter_type="openarm", address=LEFT_CAN) + +keyboard_teleop_openarm = autoconnect( + KeyboardTeleopModule.blueprint(model_path=OPENARM_V10_FK_MODEL, ee_joint_id=_teleop_hw_cfg.dof), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[_teleop_hw_cfg.to_hardware_component()], + tasks=[ + _teleop_hw_cfg.to_task_config( + task_type="cartesian_ik", + task_name="cartesian_ik_arm", + model_path=OPENARM_V10_FK_MODEL, + ee_joint_id=_teleop_hw_cfg.dof, + ), + ], + ), + ManipulationModule.blueprint( + robots=[_teleop_hw_cfg.to_robot_model_config()], + enable_viz=True, + ), +).transports( + { + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + + +__all__ = [ + "coordinator_openarm_bimanual", + "coordinator_openarm_left", + "coordinator_openarm_mock", + "coordinator_openarm_right", + "keyboard_teleop_openarm", + "keyboard_teleop_openarm_mock", + "openarm_mock_planner_coordinator", + "openarm_planner_coordinator", +] diff --git a/dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py b/dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py new file mode 100755 index 0000000000..9c740ef485 --- /dev/null +++ b/dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Probe an OpenArm on a SocketCAN interface. + +Enumerates all 8 expected Damiao motors (7 arm joints + gripper) on one CAN bus +(classical by default, use --fd for CAN-FD), enables each, reads back one state +frame, then disables. Phase-0 hardware-verification script. + +Run AFTER bringing the bus up with dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh. + +Usage: + python dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py --channel can0 + python dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py --channel can1 --ids 1,2,3,4,5,6,7 +""" + +from __future__ import annotations + +import argparse +import sys +import time + +try: + import can +except ImportError: + sys.exit("python-can not installed. Run: pip install 'python-can>=4.3'") + +# ---- Damiao motor limit tables (from enactic/openarm_can dm_motor_constants.hpp) +# [p_max rad, v_max rad/s, t_max Nm] +LIMITS: dict[str, tuple[float, float, float]] = { + "DM4310": (12.5, 30.0, 10.0), + "DM4340": (12.5, 8.0, 28.0), + "DM8006": (12.5, 45.0, 40.0), +} + +# OpenArm v10 per-joint motor assignment (derived from joint_limits.yaml effort column) +DEFAULT_MOTORS: list[tuple[int, str]] = [ + (0x01, "DM8006"), # joint1 + (0x02, "DM8006"), # joint2 + (0x03, "DM4340"), # joint3 + (0x04, "DM4340"), # joint4 + (0x05, "DM4310"), # joint5 + (0x06, "DM4310"), # joint6 + (0x07, "DM4310"), # joint7 + (0x08, "DM4310"), # gripper +] + +ENABLE = bytes([0xFF] * 7 + [0xFC]) +DISABLE = bytes([0xFF] * 7 + [0xFD]) + +FD = False # set by --fd at runtime; defaults to classical CAN @ 1 Mbit + + +def uint_to_float(x: int, lo: float, hi: float, bits: int) -> float: + return x / ((1 << bits) - 1) * (hi - lo) + lo + + +def parse_state(motor_type: str, data: bytes) -> tuple[float, float, float, int, int] | None: + """Decode an 8-byte DM motor state reply. Returns (q, dq, tau, t_mos, t_rotor).""" + if len(data) < 8: + return None + p_max, v_max, t_max = LIMITS[motor_type] + q_u = (data[1] << 8) | data[2] + dq_u = (data[3] << 4) | (data[4] >> 4) + tau_u = ((data[4] & 0x0F) << 8) | data[5] + q = uint_to_float(q_u, -p_max, p_max, 16) + dq = uint_to_float(dq_u, -v_max, v_max, 12) + tau = uint_to_float(tau_u, -t_max, t_max, 12) + return q, dq, tau, data[6], data[7] + + +def probe_motor( + bus: can.BusABC, send_id: int, recv_id: int, motor_type: str, timeout: float = 0.2 +) -> bool: + """Enable motor, wait for state reply on recv_id, print result, disable.""" + # Flush any stale frames + while bus.recv(0.0) is not None: + pass + + bus.send( + can.Message( + arbitration_id=send_id, data=ENABLE, is_extended_id=False, is_fd=FD, bitrate_switch=FD + ) + ) + t0 = time.monotonic() + while time.monotonic() - t0 < timeout: + msg = bus.recv(timeout - (time.monotonic() - t0)) + if msg is None: + break + if msg.arbitration_id != recv_id: + continue + parsed = parse_state(motor_type, bytes(msg.data)) + if parsed is None: + print(f" 0x{send_id:02X} ({motor_type}): short reply {list(msg.data)}") + bus.send( + can.Message( + arbitration_id=send_id, + data=DISABLE, + is_extended_id=False, + is_fd=FD, + bitrate_switch=FD, + ) + ) + return False + q, dq, tau, t_mos, t_rot = parsed + print( + f" 0x{send_id:02X} ({motor_type:>6}): " + f"q={q:+.3f} rad dq={dq:+.3f} rad/s tau={tau:+.3f} Nm " + f"T_mos={t_mos}C T_rotor={t_rot}C" + ) + bus.send( + can.Message( + arbitration_id=send_id, + data=DISABLE, + is_extended_id=False, + is_fd=FD, + bitrate_switch=FD, + ) + ) + return True + + print( + f" 0x{send_id:02X} ({motor_type:>6}): NO REPLY on 0x{recv_id:02X} within {timeout * 1e3:.0f}ms" + ) + bus.send( + can.Message( + arbitration_id=send_id, data=DISABLE, is_extended_id=False, is_fd=FD, bitrate_switch=FD + ) + ) + return False + + +def main() -> int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + ap.add_argument("--channel", default="can0", help="SocketCAN interface (default: can0)") + ap.add_argument( + "--fd", + action="store_true", + help="Use CAN-FD (requires FD-capable adapter). Default is classical CAN @ 1 Mbit, which is what most gs_usb adapters support.", + ) + ap.add_argument("--ids", default=None, help="Comma-separated send IDs to probe (default: 1..8)") + ap.add_argument("--timeout", type=float, default=0.2, help="Reply timeout per motor (s)") + args = ap.parse_args() + + global FD + FD = args.fd + motors = DEFAULT_MOTORS + if args.ids: + wanted = {int(x, 0) for x in args.ids.split(",")} + motors = [m for m in DEFAULT_MOTORS if m[0] in wanted] + + # Preflight: is the interface up? + try: + flags = int(open(f"/sys/class/net/{args.channel}/flags").read().strip(), 16) + iface_up = bool(flags & 0x1) + except OSError: + print(f"ERROR: interface '{args.channel}' not found", file=sys.stderr) + return 1 + if not iface_up: + print(f"ERROR: SocketCAN interface '{args.channel}' is DOWN.", file=sys.stderr) + print( + f" Run: sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh {args.channel}", + file=sys.stderr, + ) + return 1 + + print(f"Opening {args.channel} ({'CAN-FD' if FD else 'classical CAN'})...") + try: + bus = can.Bus(interface="socketcan", channel=args.channel, fd=FD) + except Exception as e: + print(f"ERROR opening {args.channel}: {e}", file=sys.stderr) + print( + " Did you run 'sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh' first?", + file=sys.stderr, + ) + return 1 + + try: + print(f"Probing {len(motors)} motor(s) on {args.channel}:") + ok = 0 + for send_id, motor_type in motors: + recv_id = send_id | 0x10 + if probe_motor(bus, send_id, recv_id, motor_type, args.timeout): + ok += 1 + print(f"\n{ok}/{len(motors)} motors replied.") + return 0 if ok == len(motors) else 2 + finally: + bus.shutdown() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh b/dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh new file mode 100755 index 0000000000..d25fc41e43 --- /dev/null +++ b/dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# Bring up CAN interfaces for OpenArm. Default is classical CAN @ 1 Mbit, +# which is what most gs_usb (OpenMoko / Geschwister Schneider) USB-CAN +# adapters support. Use MODE=fd if you have a CAN-FD-capable adapter. +# Run with sudo or as root. +# +# Usage: +# sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh # classical 1M, can0 and can1 +# sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh can0 # single interface +# sudo MODE=fd ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh can0 # CAN-FD 1M/5M +set -euo pipefail + +BITRATE=1000000 +DBITRATE=5000000 +MODE="${MODE:-classical}" # classical | fd +IFACES_ARG="${*:-can0 can1}" +# shellcheck disable=SC2206 +IFACES=(${IFACES_ARG[@]}) + +for IF in "${IFACES[@]}"; do + if ! ip link show "$IF" >/dev/null 2>&1; then + echo "[skip] $IF not present" + continue + fi + ip link set "$IF" down || true + if [ "$MODE" = "classical" ]; then + echo "[up ] $IF ${BITRATE} (classical CAN)" + ip link set "$IF" type can bitrate "$BITRATE" + else + echo "[up ] $IF ${BITRATE}/${DBITRATE} fd on" + ip link set "$IF" type can bitrate "$BITRATE" dbitrate "$DBITRATE" fd on + fi + ip link set "$IF" up + ip link set "$IF" txqueuelen 1000 + ip -details link show "$IF" | grep -E "can |bitrate" || true +done diff --git a/dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py b/dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py new file mode 100755 index 0000000000..01d9bbdb17 --- /dev/null +++ b/dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Write CTRL_MODE = MIT (1) to one or all OpenArm motors. + +Damiao motors have a persistent CTRL_MODE register (RID=10). If a motor was +previously configured in POS_VEL (2) / VEL (3) / POS_FORCE (4) mode, it will +respond to enable/disable but IGNORE MIT control frames — exactly the +"motor doesn't move, error grows" symptom. + +This script writes CTRL_MODE=1 (MIT) via the 0x7FF broadcast-write frame +format used by enactic/openarm_can: + + ID=0x7FF data = [id_lo, id_hi, 0x55, RID=10, val[0], val[1], val[2], val[3]] + +Run once per motor after CAN bring-up. The value is persistent across power +cycles. + +Usage: + # All 8 motors on can0 (classical CAN @ 1 Mbit, default) + python dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py --channel can0 + + # Single motor + python dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py --channel can0 --id 0x05 + + # CAN-FD (only if your adapter supports it) + python dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py --channel can0 --fd +""" + +from __future__ import annotations + +import argparse +import struct +import sys +import time + +try: + import can +except ImportError: + sys.exit("python-can not installed") + +RID_CTRL_MODE = 10 +MIT_MODE = 1 +DEFAULT_IDS = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08] + + +def write_ctrl_mode(bus: can.BusABC, send_id: int, fd: bool) -> bool: + val = struct.pack("> 8) & 0xFF, 0x55, RID_CTRL_MODE, val[0], val[1], val[2], val[3]] + ) + # Flush + while bus.recv(0.0) is not None: + pass + bus.send( + can.Message( + arbitration_id=0x7FF, data=data, is_extended_id=False, is_fd=fd, bitrate_switch=fd + ) + ) + # Wait for ack on 0x7FF (per openarm_can param response) + t0 = time.monotonic() + while time.monotonic() - t0 < 0.2: + msg = bus.recv(0.2 - (time.monotonic() - t0)) + if msg is None: + break + # Reply echoes 0x55 in byte 2 of the 0x7FF channel + if len(msg.data) >= 4 and msg.data[2] in (0x33, 0x55): + rid = msg.data[3] + if rid == RID_CTRL_MODE: + echoed = int(struct.unpack(" int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + ap.add_argument("--channel", default="can0") + ap.add_argument("--fd", action="store_true", help="Use CAN-FD (default: classical CAN)") + ap.add_argument( + "--id", type=lambda s: int(s, 0), default=None, help="Single send ID (default: all 8)" + ) + args = ap.parse_args() + + fd = args.fd + ids = [args.id] if args.id is not None else DEFAULT_IDS + + # Preflight: is the interface up? + try: + flags = int(open(f"/sys/class/net/{args.channel}/flags").read().strip(), 16) + except OSError: + print(f"ERROR: interface '{args.channel}' not found", file=sys.stderr) + return 1 + if not (flags & 0x1): + print(f"ERROR: SocketCAN interface '{args.channel}' is DOWN.", file=sys.stderr) + print( + f" Run: sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh {args.channel}", + file=sys.stderr, + ) + return 1 + + print(f"Opening {args.channel} ({'CAN-FD' if fd else 'classical'})") + bus = can.Bus(interface="socketcan", channel=args.channel, fd=fd) + try: + ok = 0 + for i in ids: + if write_ctrl_mode(bus, i, fd): + ok += 1 + time.sleep(0.05) + print(f"\n{ok}/{len(ids)} motors set to MIT mode.") + return 0 if ok == len(ids) else 2 + finally: + bus.shutdown() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dimos/utils/workspace.py b/dimos/utils/workspace.py new file mode 100644 index 0000000000..70d21f5a7e --- /dev/null +++ b/dimos/utils/workspace.py @@ -0,0 +1,314 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reachability + Yoshikawa-manipulability workspace analysis via Pinocchio. + +Run ``python -m dimos.utils.workspace [viz|query|suggest|interactive]``. +""" + +from __future__ import annotations + +import argparse +from pathlib import Path +import sys +import time +from typing import Any + +import numpy as np + + +class WorkspaceMap: + """Sampled reachability + manipulability map. Works on any URDF Pinocchio can load.""" + + def __init__( + self, + urdf_path: str | Path, + n_samples: int = 100_000, + *, + ee_joint_id: int | None = None, + seed: int = 42, + ) -> None: + import pinocchio + + self._pin = pinocchio + self.model = pinocchio.buildModelFromUrdf(str(urdf_path)) + self.data = self.model.createData() + self.ee_id = ee_joint_id if ee_joint_id is not None else (self.model.njoints - 1) + self.q_lo = self.model.lowerPositionLimit.copy() + self.q_hi = self.model.upperPositionLimit.copy() + self._sample(n_samples, seed) + + def _sample(self, n: int, seed: int) -> None: + rng = np.random.default_rng(seed) + self.positions = np.empty((n, 3)) + self.configs = np.empty((n, self.model.nq)) + self.manipulability = np.empty(n) + + for i in range(n): + q = rng.uniform(self.q_lo, self.q_hi) + self._pin.forwardKinematics(self.model, self.data, q) + self._pin.computeJointJacobians(self.model, self.data, q) + + self.positions[i] = self.data.oMi[self.ee_id].translation + self.configs[i] = q + + J = self._pin.getJointJacobian( + self.model, + self.data, + self.ee_id, + self._pin.ReferenceFrame.LOCAL_WORLD_ALIGNED, + ) + JJt = J[:3, :] @ J[:3, :].T + self.manipulability[i] = np.sqrt(max(0.0, np.linalg.det(JJt))) + + def query( + self, + target: np.ndarray | tuple[float, float, float], + radius: float = 0.03, + ) -> dict[str, Any]: + """Check reachability at a Cartesian target (x,y,z).""" + target = np.asarray(target, dtype=np.float64) + dists = np.linalg.norm(self.positions - target, axis=1) + mask = dists < radius + if int(mask.sum()) == 0: + for r in (0.05, 0.08, 0.12): + mask = dists < r + if mask.sum() > 0: + break + n_nearby = int(mask.sum()) + if n_nearby == 0: + nearest = int(np.argmin(dists)) + return { + "reachable": False, + "n_configs": 0, + "mean_manipulability": 0.0, + "nearest_distance": float(dists[nearest]), + "nearest_position": self.positions[nearest].tolist(), + "nearest_config": self.configs[nearest].tolist(), + } + manip_nearby = self.manipulability[mask] + indices = np.where(mask)[0] + best_idx = indices[int(np.argmax(manip_nearby))] + return { + "reachable": True, + "n_configs": n_nearby, + "mean_manipulability": float(manip_nearby.mean()), + "max_manipulability": float(manip_nearby.max()), + "best_config": self.configs[best_idx].tolist(), + "best_position": self.positions[best_idx].tolist(), + "distance": float(dists[best_idx]), + } + + def stats(self) -> str: + """Human-readable workspace summary (bounds, reach, hull volume).""" + p = self.positions + lines = [ + "Workspace stats:", + f" Samples: {len(p):,}", + f" X range: [{p[:, 0].min():.3f}, {p[:, 0].max():.3f}] m", + f" Y range: [{p[:, 1].min():.3f}, {p[:, 1].max():.3f}] m", + f" Z range: [{p[:, 2].min():.3f}, {p[:, 2].max():.3f}] m", + f" Max reach from origin: {np.linalg.norm(p, axis=1).max():.3f} m", + f" Manipulability: [{self.manipulability.min():.4f}, {self.manipulability.max():.4f}]", + ] + try: + from scipy.spatial import ConvexHull + + hull = ConvexHull(p) + lines.append(f" Convex hull volume: {hull.volume:.4f} m³") + except Exception: + pass + return "\n".join(lines) + + +# ── Meshcat visualization helpers ────────────────────────────────────────── + + +def _colormap(values: np.ndarray) -> np.ndarray: + """Red→green colormap, clipped at 2nd/98th percentile for contrast.""" + v = values.copy() + lo, hi = np.percentile(v, 2), np.percentile(v, 98) + v = np.clip((v - lo) / (hi - lo + 1e-12), 0.0, 1.0) + colors = np.zeros((len(v), 3), dtype=np.uint8) + colors[:, 0] = ((1.0 - v) * 255).astype(np.uint8) + colors[:, 1] = (v * 255).astype(np.uint8) + colors[:, 2] = 25 + return colors + + +def render_cloud(meshcat: Any, ws: WorkspaceMap, path: str = "/workspace") -> None: + """Render a WorkspaceMap's EE positions to Drake's Meshcat, colored by manipulability.""" + from pydrake.perception import BaseField, Fields, PointCloud + + cloud = PointCloud(len(ws.positions), Fields(int(BaseField.kXYZs) | int(BaseField.kRGBs))) + cloud.mutable_xyzs()[:] = ws.positions.T.astype(np.float32) + cloud.mutable_rgbs()[:] = _colormap(ws.manipulability).T + meshcat.SetObject(path, cloud, point_size=0.004) + + +def render_target( + meshcat: Any, + pos: list[float] | tuple[float, float, float], + name: str = "target", + color: tuple[float, float, float] = (1.0, 0.0, 0.0), +) -> None: + from pydrake.geometry import Rgba, Sphere + from pydrake.math import RigidTransform + + meshcat.SetObject(f"/{name}", Sphere(0.015), Rgba(*color, 0.8)) + meshcat.SetTransform(f"/{name}", RigidTransform(list(pos))) + + +# ── CLI ──────────────────────────────────────────────────────────────────── + + +def _cmd_viz(args: argparse.Namespace) -> int: + from pydrake.geometry import Meshcat + + ws = WorkspaceMap(args.urdf, args.samples, ee_joint_id=args.ee_joint_id) + print(ws.stats()) + meshcat = Meshcat() + print(f"\nMeshcat: {meshcat.web_url()}") + print("Green = dexterous, Red = near singularity\n") + render_cloud(meshcat, ws) + print("Press Ctrl-C to exit.") + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + pass + return 0 + + +def _cmd_query(args: argparse.Namespace) -> int: + ws = WorkspaceMap(args.urdf, args.samples, ee_joint_id=args.ee_joint_id) + result = ws.query((args.x, args.y, args.z)) + print(f"\nTarget: ({args.x:.3f}, {args.y:.3f}, {args.z:.3f})") + if result["reachable"]: + print(f" REACHABLE — {result['n_configs']} configs within 3cm") + print(f" Mean manipulability: {result['mean_manipulability']:.4f}") + print(f" Best config: {[f'{q:.3f}' for q in result['best_config']]}") + p = result["best_position"] + print(f" Best EE position: ({p[0]:.3f}, {p[1]:.3f}, {p[2]:.3f})") + else: + print(" NOT REACHABLE") + p = result["nearest_position"] + print(f" Nearest reachable point: ({p[0]:.3f}, {p[1]:.3f}, {p[2]:.3f})") + print(f" Distance to nearest: {result['nearest_distance']:.3f} m") + print(f" Nearest config: {[f'{q:.3f}' for q in result['nearest_config']]}") + return 0 + + +def _cmd_suggest(args: argparse.Namespace) -> int: + ws = WorkspaceMap(args.urdf, args.samples, ee_joint_id=args.ee_joint_id) + target = np.array([args.x, args.y, args.z]) + dists = np.linalg.norm(ws.positions - target, axis=1) + closest = np.argsort(dists)[:20] + sorted_by_manip = sorted(closest, key=lambda i: -ws.manipulability[i]) + + print(f"\nSuggested poses near ({args.x:.3f}, {args.y:.3f}, {args.z:.3f}):") + print(f"{'#':>3} {'dist':>6} {'manip':>7} {'position':>30} joint config") + print("-" * 100) + for rank, idx in enumerate(sorted_by_manip[:10], 1): + p, q = ws.positions[idx], ws.configs[idx] + pos_str = f"({p[0]:+.3f}, {p[1]:+.3f}, {p[2]:+.3f})" + q_str = "[" + ", ".join(f"{v:.2f}" for v in q) + "]" + print( + f"{rank:>3} {dists[idx]:>6.3f} {ws.manipulability[idx]:>7.4f} {pos_str:>30} {q_str}" + ) + return 0 + + +def _cmd_interactive(args: argparse.Namespace) -> int: + from pydrake.geometry import Meshcat + + ws = WorkspaceMap(args.urdf, args.samples, ee_joint_id=args.ee_joint_id) + print(ws.stats()) + meshcat = Meshcat() + print(f"\nMeshcat: {meshcat.web_url()}") + render_cloud(meshcat, ws) + print("\nType target as 'x y z' (meters), or 'q' to quit.\n") + + while True: + try: + line = input("target> ").strip() + except (EOFError, KeyboardInterrupt): + break + if line.lower() in ("q", "quit", "exit"): + break + parts = line.split() + if len(parts) != 3: + print(" Enter three floats: x y z") + continue + try: + x, y, z = (float(p) for p in parts) + except ValueError: + print(" Invalid input") + continue + + result = ws.query((x, y, z)) + if result["reachable"]: + render_target(meshcat, (x, y, z), "target", (0.0, 1.0, 0.0)) + render_target(meshcat, result["best_position"], "best_ee", (0.0, 0.5, 1.0)) + print( + f" REACHABLE — {result['n_configs']} configs, " + f"manip={result['mean_manipulability']:.4f}" + ) + print(f" Joint config: {[round(q, 3) for q in result['best_config']]}") + else: + render_target(meshcat, (x, y, z), "target", (1.0, 0.0, 0.0)) + render_target(meshcat, result["nearest_position"], "nearest", (1.0, 0.5, 0.0)) + print(f" NOT REACHABLE — nearest is {result['nearest_distance']:.3f}m away") + print(f" Nearest config: {[round(q, 3) for q in result['nearest_config']]}") + return 0 + + +def main() -> int: + ap = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + ap.add_argument("urdf", type=Path, help="Path to a URDF parseable by Pinocchio") + ap.add_argument("--samples", type=int, default=100_000) + ap.add_argument( + "--ee-joint-id", + type=int, + default=None, + help="Pinocchio joint index for the end-effector (default: last joint)", + ) + + sub = ap.add_subparsers(dest="command") + sub.add_parser("viz", help="Visualize workspace in Meshcat (default)") + sub.add_parser("interactive", help="Visualize + interactive target query") + q = sub.add_parser("query", help="Query if a target is reachable") + q.add_argument("x", type=float) + q.add_argument("y", type=float) + q.add_argument("z", type=float) + s = sub.add_parser("suggest", help="Suggest reachable poses near a target") + s.add_argument("x", type=float) + s.add_argument("y", type=float) + s.add_argument("z", type=float) + + args = ap.parse_args() + cmd = args.command or "viz" + return { + "viz": _cmd_viz, + "query": _cmd_query, + "suggest": _cmd_suggest, + "interactive": _cmd_interactive, + }[cmd](args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/docs/capabilities/manipulation/openarm_integration.md b/docs/capabilities/manipulation/openarm_integration.md new file mode 100644 index 0000000000..49a78ccde7 --- /dev/null +++ b/docs/capabilities/manipulation/openarm_integration.md @@ -0,0 +1,387 @@ +# OpenArm Integration + +Guide for running the **OpenArm** — an open-source bimanual 7-DOF research arm built from Damiao DM-J quasi-direct-drive motors — under the dimos manipulation + control stack. + +**If you're standing in front of the hardware and just want to run it, skip to [Quick start](#quick-start).** + +Related: +- Upstream hardware + C++ reference: [enactic/openarm_can](https://github.com/enactic/openarm_can) +- How to integrate any new arm: [adding_a_custom_arm.md](/docs/capabilities/manipulation/adding_a_custom_arm.md) + +--- + +## Why this integration is different + +Every other arm in dimos wraps a vendor Python SDK: + +| Arm | Transport | Python SDK | +|---|---|---| +| xArm | TCP/IP | `xarm-python-sdk` | +| Piper | CAN (via SDK) | `piper_sdk` | +| R1 Pro | Galaxea | Galaxea SDK | +| Go2 / G1 | WebRTC | Unitree SDK | +| Panda | FCI | `panda-py` | + +**OpenArm ships no Python SDK.** The only interface is raw CAN frames on the wire, speaking the Damiao MIT-mode protocol. So dimos includes a from-scratch driver that encodes/decodes the protocol directly on a SocketCAN bus. The reference implementation is the Enactic C++ library at [enactic/openarm_can](https://github.com/enactic/openarm_can) — we port the frame layout from there. + +## Architecture + +``` +ManipulationModule → ControlCoordinator → OpenArmAdapter → OpenArmBus → SocketCAN → arm + (Drake plan) (100Hz tick loop) (dimos protocol) (CAN driver) +``` + +Code layout: + +``` +dimos/hardware/manipulators/openarm/ +├── driver.py # OpenArmBus, DamiaoMotor — pure CAN driver, no dimos deps +├── adapter.py # OpenArmAdapter — implements dimos ManipulatorAdapter protocol +├── test_driver.py # 13 unit tests (virtual CAN loopback, no hardware) +└── test_adapter.py # 11 unit tests (virtual CAN + mock state frames) + +dimos/robot/catalog/openarm.py # openarm_arm() and openarm_single() config factories +dimos/robot/manipulators/openarm/ +├── blueprints.py # coordinator-*, planner-*, keyboard-teleop-* blueprints +└── scripts/ # bring-up + diagnostic scripts (run manually by humans) + ├── openarm_can_up.sh # bring SocketCAN interfaces up (needs sudo) + ├── openarm_can_probe.py # enumerate & read state from all 8 motors + ├── openarm_set_mit_mode.py # one-time CTRL_MODE=MIT write per motor + └── ... (diagnostics) + +data/openarm_description/ # URDF + meshes (in-tree; may migrate to LFS) +└── urdf/robot/ + ├── openarm_v10_bimanual.urdf # both arms (14 DOF, used by coordinator) + ├── openarm_v10_left.urdf # left arm + torso (7 DOF, per-side planning) + ├── openarm_v10_right.urdf # right arm + torso (7 DOF) + └── openarm_v10_single.urdf # standalone arm (Pinocchio FK for teleop) +``` + +Workspace analysis is generic and lives in [dimos/utils/workspace.py](/dimos/utils/workspace.py) — works for any URDF, not just OpenArm. + +--- + +## Quick start + +You need: + +- 2× **OpenArm v10** arms, wired to USB-CAN adapters +- 2× **USB-CAN adapters** (we used gs_usb family, VID:PID `1d50:606f`, e.g. CANable 2.0). Classical CAN @ 1 Mbit is enough; CAN-FD not required +- **Python 3.12 venv with dimos installed** plus `python-can >= 4.3` and `pinocchio` +- **sudo** on first run (to bring up the CAN interfaces) + +### 1. Bring up the CAN buses + +```bash +sudo ./dimos/robot/manipulators/openarm/scripts/openarm_can_up.sh can0 can1 +``` + +This sets both interfaces to classical CAN @ 1 Mbit with a 1000-frame TX queue (enough headroom for the 100 Hz tick loop). If only one bus is present, pass just that one: `sudo ... openarm_can_up.sh can0`. + +**Troubleshooting:** +- `Operation not permitted` → you forgot `sudo`. +- `Operation not supported` on `fd on` → your adapter doesn't support CAN-FD. The script defaults to classical, so this shouldn't happen unless you set `MODE=fd`. +- Only one `can*` interface appears → the other adapter isn't enumerating. On gs_usb boards, the **blue LED** indicates USB enumeration. If one adapter only shows red/green, swap the USB cable (many USB-C cables are charge-only). + +### 2. Verify all 16 motors are alive + +```bash +python ./dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py --channel can0 +python ./dimos/robot/manipulators/openarm/scripts/openarm_can_probe.py --channel can1 +``` + +Expected: `8/8 motors replied` on each bus, with plausible joint positions and rotor temps around 25–30 °C. + +### 3. (First time only) Put motors in MIT mode + +Damiao motors have a persistent `CTRL_MODE` register. They ship in POS_VEL mode by default, which means they will reply to enable/state queries but **silently ignore** any MIT control frames — the "motor doesn't move, error grows" failure. The adapter writes MIT on every `connect()` by default, so this step is usually automatic. If you want to set it explicitly once: + +```bash +python ./dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py --channel can0 +python ./dimos/robot/manipulators/openarm/scripts/openarm_set_mit_mode.py --channel can1 +``` + +The register is persistent across power cycles, so you only need this once per motor (or after a firmware reset). + +### 4. Run a blueprint + +| Blueprint | What it does | +|---|---| +| `coordinator-openarm-mock` | Bimanual, mock adapters. No hardware. | +| `openarm-mock-planner-coordinator` | Drake planner + bimanual mock, Meshcat viz. Great smoke test. | +| `coordinator-openarm-left` / `coordinator-openarm-right` | Single arm, real hardware on can0 / can1. | +| `coordinator-openarm-bimanual` | Both arms, real hardware, no planner. | +| `openarm-planner-coordinator` | **Main usable blueprint** — Drake planner + both arms on real hardware. | +| `keyboard-teleop-openarm-mock` / `keyboard-teleop-openarm` | Single-arm Cartesian IK + pygame keyboard, mock / real. | + +**Safety before hot-plugging hardware:** hold the arms before starting. On connect, the adapter enables all motors and sends gravity-comp holds — the arms go slightly stiff but don't leap. Ctrl-C to cleanly disable and exit. + +First-time recommendation: mock planner to verify everything wires up, then real single-arm, then bimanual. + +```bash +# smoke test (no hardware) +dimos run openarm-mock-planner-coordinator + +# single-arm bring-up (hold the arm physically first) +dimos run coordinator-openarm-left + +# full bimanual with planner +dimos run openarm-planner-coordinator +``` + +Meshcat will appear at http://localhost:7000. + +### 5. Drive the arms from the manipulation client + +With `openarm-planner-coordinator` running in one terminal, open a second terminal and start the REPL client: + +```bash +python -i -m dimos.manipulation.planning.examples.manipulation_client +``` + +This gives you an interactive Python prompt with these functions: + +| Function | Purpose | +|---|---| +| `robots()` | List configured robots (here: `["left_arm", "right_arm"]`) | +| `joints(robot_name)` | Read current joint positions (7 floats) | +| `ee(robot_name)` | Read current end-effector pose | +| `state()` | Module state: `IDLE`, `PLANNING`, `EXECUTING`, `FAULT`, etc. | +| `plan([q1..q7], robot_name)` | Plan a collision-free trajectory to a joint configuration | +| `plan_pose(x, y, z, robot_name=...)` | Plan to a Cartesian EE pose (preserves current orientation) | +| `preview(robot_name)` | Animate the planned path in Meshcat without executing | +| `execute(robot_name)` | Send the planned trajectory to the coordinator | +| `home(robot_name)` | Plan + execute to home joints | +| `commands()` | Print all available functions | + +#### Example session — simple joint moves + +```python +>>> robots() +['left_arm', 'right_arm'] + +>>> joints(robot_name="left_arm") +[0.02, -0.01, -0.13, 0.15, 0.17, -0.07, 0.10] + +>>> # One-liner: plan → preview in Meshcat → execute on hardware +>>> plan([0.3, 0, 0, 0, 0, 0, 0], robot_name="left_arm") and preview(robot_name="left_arm") and execute(robot_name="left_arm") +True + +>>> joints(robot_name="left_arm") +[0.30, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00] # arm is now at the commanded pose +``` + +`plan()` returns `True` on success, `False` if planning failed (check the coordinator terminal for `COLLISION_AT_GOAL`, `INVALID_START`, `NO_SOLUTION`, etc). The `and` chaining is an idiom — if any step fails, the next one is short-circuited. + +If you ever get stuck in a `FAULT` state (e.g. an invalid plan was sent), reset the state machine: + +```python +>>> _client.reset() +'Reset to IDLE — ready for new commands' +``` + +#### Example session — bimanual + +```python +>>> # Move both arms to mirrored poses +>>> plan([0.5, 0, 0, 0, 0, 0, 0], robot_name="left_arm") and execute(robot_name="left_arm") +True +>>> plan([-0.5, 0, 0, 0, 0, 0, 0], robot_name="right_arm") and execute(robot_name="right_arm") +True +``` + +Each arm plans and executes independently — the coordinator runs both trajectories simultaneously on separate tick-loop tasks. + +#### Example session — Cartesian target + +```python +>>> ee(robot_name="left_arm") # see where the EE currently is +>>> plan_pose(0.1, 0.3, 0.5, robot_name="left_arm") and preview(robot_name="left_arm") +True +>>> execute(robot_name="left_arm") +True +``` + +If you don't know which Cartesian targets are reachable, check first with the workspace tool — see [Workspace analysis](#workspace-analysis) below. `plan_pose` will fail with `NO_SOLUTION` if the IK can't find a configuration reaching the target. + +#### Adding obstacles + +```python +>>> add_box("table", 0.4, 0.0, 0.1, w=0.6, h=0.4, d=0.05) # rectangular obstacle +>>> add_sphere("ball", 0.3, 0.2, 0.4, radius=0.05) +>>> plan_pose(0.4, 0.0, 0.3, robot_name="left_arm") # now plans around it +>>> remove("table") # id returned by add_* +``` + +--- + +## Configuration + +### Which CAN bus is which arm + +Linux assigns `can0`/`can1` in USB-enumeration order, which isn't guaranteed stable across reboots or cable swaps. If the arms come up "swapped" (commanding `left_arm` moves the physical right arm), flip these two constants at the top of [blueprints.py](/dimos/robot/manipulators/openarm/blueprints.py): + +```python +LEFT_CAN = "can0" +RIGHT_CAN = "can1" +``` + +No other code changes are needed. + +### Gain tuning (MIT kp/kd) + +Defaults live in [adapter.py](/dimos/hardware/manipulators/openarm/adapter.py). Gains are per-joint because the shoulder motors (DM8006, 40 Nm) tolerate higher kp than the wrist motors (DM4310, 10 Nm): + +```python +_DEFAULT_KP = [100.0, 100.0, 80.0, 80.0, 60.0, 60.0, 60.0] +_DEFAULT_KD = [1.5, 1.5, 1.0, 1.0, 0.8, 0.8, 0.8] +``` + +Guidelines: +- `kp ∈ [0, 500]` in MIT mode. Higher kp = stiffer position tracking; too high → oscillation. +- `kd ∈ [0, 5]`. Higher kd = more damping, but values above ~2 on these gearboxes cause high-frequency buzz/grinding. +- Gravity compensation is on by default (`gravity_comp=True`) — the adapter uses Pinocchio to compute `G(q)` and adds it as feedforward torque. This removes the need for very high kp to fight gravity, so prefer low kp + gravity comp over high kp. + +### Physical joint limits + +The URDFs use the xacro-generated limits (which include per-side offsets for mirroring). The adapter's `get_limits()` reports the same per-side limits. If you measure tighter physical limits and want to enforce them, edit the URDFs directly — the planner will respect them. + +### Disabling auto MIT-mode write + +The adapter writes `CTRL_MODE=MIT` to every motor at `connect()`. It's idempotent (writing the same value is a no-op), so this is safe to leave on. To verify that a previous write persisted across a power cycle, flip `AUTO_SET_MIT_MODE = False` in [blueprints.py](/dimos/robot/manipulators/openarm/blueprints.py) and restart — the arms should still respond. + +--- + +## Motor mapping (OpenArm v10) + +Derived from the URDF's `joint_limits.yaml` (effort column) cross-checked against the Damiao torque tables. Both arms are identical. + +| Send ID | Recv ID | Joint | Motor | vMax [rad/s] | tMax [Nm] | +|---|---|---|---|---|---| +| 0x01 | 0x11 | joint1 | DM8006 | 45 | 40 | +| 0x02 | 0x12 | joint2 | DM8006 | 45 | 40 | +| 0x03 | 0x13 | joint3 | DM4340 | 8 | 28 | +| 0x04 | 0x14 | joint4 | DM4340 | 8 | 28 | +| 0x05 | 0x15 | joint5 | DM4310 | 30 | 10 | +| 0x06 | 0x16 | joint6 | DM4310 | 30 | 10 | +| 0x07 | 0x17 | joint7 | DM4310 | 30 | 10 | +| 0x08 | 0x18 | gripper | DM4310 | 30 | 10 | + +Convention: `recv_id = send_id | 0x10`. + +--- + +## Damiao protocol essentials + +Ported from `enactic/openarm_can/src/openarm/damiao_motor/dm_motor_control.cpp`. You shouldn't need these unless you're modifying the driver. + +### Enable / disable / zero-position + +Send to the motor's send_id. 8-byte payload: + +``` +[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, CMD] + where CMD = 0xFC (enable) | 0xFD (disable) | 0xFE (zero current pose) +``` + +### MIT control frame (8 bytes) + +Bit layout: `q[16] | dq[12] | kp[12] | kd[12] | tau[12]`. Each float quantized via: + +```python +def float_to_uint(x, lo, hi, bits): + x = clamp(x, lo, hi) + return round((x - lo) / (hi - lo) * ((1 << bits) - 1)) +``` + +Gain ranges: `kp ∈ [0, 500]`, `kd ∈ [0, 5]`. Position/velocity/torque ranges come from the motor-type table above. + +Byte layout: +``` +byte0 = q_u >> 8 +byte1 = q_u & 0xFF +byte2 = dq_u >> 4 +byte3 = ((dq_u & 0xF) << 4) | ((kp_u >> 8) & 0xF) +byte4 = kp_u & 0xFF +byte5 = kd_u >> 4 +byte6 = ((kd_u & 0xF) << 4) | ((tau_u >> 8) & 0xF) +byte7 = tau_u & 0xFF +``` + +### State reply (8 bytes, on recv_id) + +Same `q | dq | tau` layout + 2 temperature bytes: + +``` +byte0 = motor_id_echo +byte1..5 = q | dq | tau (same packing as above) +byte6 = t_mos (°C) +byte7 = t_rotor (°C) +``` + +### CTRL_MODE register write + +Broadcast frame on CAN ID `0x7FF`: + +``` +data = [send_id_lo, send_id_hi, 0x55, RID=10, val[0..3]] + where val = 1 (MIT) | 2 (POS_VEL) | 3 (VEL) | 4 (POS_FORCE), little-endian uint32 +``` + +Persistent across power cycles. + +--- + +## Known gotchas + +- **`ip link ... fd on` → `Operation not supported`.** gs_usb firmware doesn't support CAN-FD. Use classical CAN @ 1 Mbit (our bringup script's default). +- **Motors reply to probes but commands do nothing.** CTRL_MODE is not MIT. The adapter now writes MIT on connect, but if you disabled that and motors got reset, run `openarm_set_mit_mode.py`. +- **`COLLISION_AT_START` during planning.** `link5` and `link7` collision meshes overlap by 3 mm at every configuration. Handled by `OPENARM_COLLISION_EXCLUSIONS` in the catalog. If you see it anyway, the exclusion pairs may not be getting applied — check that the collision filter log line appears during world build. +- **`INVALID_START` during planning.** Hardware encoder noise pushed a joint 1 mrad past a URDF limit. Joint4 used to be exactly `lower=0.0` which tripped this — it's now `-0.01` to give breathing room. If you see it on a different joint, widen that limit by ~10 mrad. +- **"Transmit buffer full" (ENOBUFS) at 100 Hz.** Kernel TX queue too small. The bringup script sets `txqueuelen 1000`; the driver also retries on ENOBUFS. If you still see the error, check `ip -details link show canX | grep qlen`. +- **Arms swap sides.** USB enumeration order flipped. Swap `LEFT_CAN` / `RIGHT_CAN` in [blueprints.py](/dimos/robot/manipulators/openarm/blueprints.py). + +--- + +## Design decisions + +- **Driver separate from adapter.** `driver.py` has zero dimos deps → unit-testable with a virtual CAN bus, reusable outside dimos. +- **MIT mode for everything.** MIT can emulate position (high kp), velocity (kp=0, nonzero kd+dq), and torque (kp=kd=0, nonzero tau). One code path. +- **Gravity compensation on by default.** Eliminates steady-state position error without needing high kp. Needs Pinocchio + the per-side URDFs. +- **One adapter per CAN bus, keyed by `address`.** Matches the Piper adapter pattern. Bimanual = two adapters with different `address` values. +- **Per-side URDFs for Drake planning.** Loading the full 14-DOF bimanual URDF twice (once per robot instance) creates phantom-arm collisions with the "other" arm frozen at zero. The per-side URDFs keep only one arm's links + the torso, avoiding the phantom collisions while matching the bimanual kinematics exactly. +- **URDF stays in-tree (`data/openarm_description/`) for now.** Can migrate to LFS later — only the path constant in the catalog changes. +- **CAN bringup stays manual (`sudo`).** Auto-bringup from `connect()` would need sudo-in-a-library or a systemd unit; the explicit script is clearer and testable. For production, add a oneshot systemd unit that runs the script at boot. + +--- + +## Workspace analysis + +For figuring out which targets are reachable before planning, use the generic workspace tool: + +```bash +# Visualize the left arm's reachable workspace as a point cloud +python -m dimos.utils.workspace data/openarm_description/urdf/robot/openarm_v10_left.urdf + +# Check if a specific target is reachable +python -m dimos.utils.workspace data/openarm_description/urdf/robot/openarm_v10_left.urdf query 0.1 0.3 0.5 + +# Get a list of reachable poses near a target, ranked by manipulability +python -m dimos.utils.workspace data/openarm_description/urdf/robot/openarm_v10_left.urdf suggest 0.1 0.3 0.5 + +# Interactive: visualize + type targets to query +python -m dimos.utils.workspace data/openarm_description/urdf/robot/openarm_v10_left.urdf interactive +``` + +Points are colored by Yoshikawa manipulability index: green = dexterous, red = near singularity. Avoid planning targets in the red regions. + +--- + +## Testing + +```bash +# Unit tests (no hardware, use virtual CAN) +.venv/bin/python -m pytest dimos/hardware/manipulators/openarm/ -v +``` + +Expected: 24 passed (13 driver + 11 adapter). All tests use `can.Bus(interface="virtual")` loopback — no real hardware needed. diff --git a/pyproject.toml b/pyproject.toml index 074670cb60..398903f457 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -214,6 +214,10 @@ manipulation = [ "pyrealsense2; sys_platform != 'darwin'", "xarm-python-sdk>=1.17.0", + # Mesh conversion (STL/DAE → OBJ for Drake collision geometry) + "trimesh", + "pycollada", + # Visualization (Optional) "kaleido>=0.2.1", "plotly>=5.9.0", diff --git a/uv.lock b/uv.lock index 6e94931740..f429ae7b71 100644 --- a/uv.lock +++ b/uv.lock @@ -1848,8 +1848,10 @@ manipulation = [ { name = "matplotlib" }, { name = "piper-sdk" }, { name = "plotly" }, + { name = "pycollada" }, { name = "pyrealsense2", marker = "sys_platform != 'darwin'" }, { name = "pyyaml" }, + { name = "trimesh" }, { name = "xacro" }, { name = "xarm-python-sdk" }, ] @@ -2061,6 +2063,7 @@ requires-dist = [ { name = "psutil", specifier = ">=7.0.0" }, { name = "psycopg2-binary", marker = "extra == 'psql'", specifier = ">=2.9.11" }, { name = "py-spy", marker = "extra == 'dev'" }, + { name = "pycollada", marker = "extra == 'manipulation'" }, { name = "pydantic" }, { name = "pydantic", marker = "extra == 'docker'" }, { name = "pydantic-settings", specifier = ">=2.11.0,<3" }, @@ -2112,6 +2115,7 @@ requires-dist = [ { name = "toolz", specifier = ">=1.1.0" }, { name = "torchreid", marker = "extra == 'misc'", specifier = "==0.2.5" }, { name = "transformers", extras = ["torch"], marker = "extra == 'perception'", specifier = "==4.49.0" }, + { name = "trimesh", marker = "extra == 'manipulation'" }, { name = "typeguard", marker = "extra == 'misc'" }, { name = "typer", specifier = ">=0.19.2,<1" }, { name = "typer", marker = "extra == 'docker'", specifier = ">=0.19.2,<1" }, @@ -7381,6 +7385,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3a/d8/a211b3f85e99a0daa2ddec96c949cac6824bd305b040571b82a03dd62636/pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3", size = 31284, upload-time = "2024-08-04T20:26:53.173Z" }, ] +[[package]] +name = "pycollada" +version = "0.9.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5a/8d/52a5364a17eb96129962cae8d3ee7658775e085ad0ba38388684ad5944e9/pycollada-0.9.3.tar.gz", hash = "sha256:c34d6dcf0fe2eba5896f71c96d37a1c0fe1a61f08440fa0cfcec3dc2895d3302", size = 110826, upload-time = "2026-01-24T15:45:23.625Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/86/f1f61b7a0701f9d1299e5293d083318019f91021a4d449f94d59dbe024e4/pycollada-0.9.3-py3-none-any.whl", hash = "sha256:636e6496f60987586db82455ea7bbd9ade775e8181c6590c83b698b6cd53a9f5", size = 129206, upload-time = "2026-01-24T15:45:22.182Z" }, +] + [[package]] name = "pycparser" version = "3.0"