diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index f6c82a84ff..0c7b8c9788 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -29,7 +29,7 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import ModuleBase, ModuleSpec from dimos.core.resource import Resource -from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport +from dimos.core.transport import ZENOH_AVAILABLE, LCMTransport, PubSubTransport, pLCMTransport from dimos.spec.utils import spec_annotation_compliance, spec_structural_compliance from dimos.utils.generic import short_id from dimos.utils.logging_config import setup_logger @@ -542,7 +542,23 @@ def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> Pu use_pickled = getattr(stream_type, "lcm_encode", None) is None topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}" - transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type) + + if global_config.transport == "zenoh": + if not ZENOH_AVAILABLE: + raise RuntimeError( + "transport='zenoh' but eclipse-zenoh is not installed. " + "Install with: uv sync --extra zenoh" + ) + from dimos.core.transport import ZenohTransport, pZenohTransport + + zenoh_topic = f"dimos{topic}" + transport = ( + pZenohTransport(zenoh_topic) + if use_pickled + else ZenohTransport(zenoh_topic, stream_type) + ) + else: + transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type) return transport @@ -612,7 +628,8 @@ def _run_configurators(blueprint: Blueprint) -> None: from dimos.protocol.service.system_configurator.base import configure_system from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators - configurators = [*lcm_configurators(), *blueprint.configurator_checks] + lcm_checks = lcm_configurators() if global_config.transport == "lcm" else [] + configurators = [*lcm_checks, *blueprint.configurator_checks] try: configure_system(configurators) diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index ccf5b0644c..1e68369b0e 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -12,20 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import platform import re from typing import Literal, TypeAlias +from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict +from dimos.core.transport import ZENOH_AVAILABLE from dimos.models.vl.types import VlModelName ViewerBackend: TypeAlias = Literal["rerun", "rerun-web", "rerun-connect", "foxglove", "none"] +TransportBackend: TypeAlias = Literal["lcm", "zenoh"] def _get_all_numbers(s: str) -> list[float]: return [float(x) for x in re.findall(r"-?\d+\.?\d*", s)] +def _default_transport() -> TransportBackend: + if platform.system() == "Darwin" and ZENOH_AVAILABLE: + return "zenoh" + return "lcm" + + class GlobalConfig(BaseSettings): robot_ip: str | None = None robot_ips: str | None = None @@ -52,6 +62,7 @@ class GlobalConfig(BaseSettings): nerf_speed: float = 1.0 planner_robot_speed: float | None = None mcp_port: int = 9990 + transport: TransportBackend = Field(default_factory=_default_transport) dtop: bool = False obstacle_avoidance: bool = True detection_model: VlModelName = "moondream" @@ -61,6 +72,7 @@ class GlobalConfig(BaseSettings): env_file=".env", env_file_encoding="utf-8", extra="ignore", + validate_assignment=True, ) def update(self, **kwargs: object) -> None: diff --git a/dimos/core/test_utils.py b/dimos/core/test_utils.py new file mode 100644 index 0000000000..c13c6128c2 --- /dev/null +++ b/dimos/core/test_utils.py @@ -0,0 +1,43 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared test helpers that must remain import-safe in minimal environments.""" + +from __future__ import annotations + +from collections.abc import Callable +import threading + + +def retry_until( + event: threading.Event, + action: Callable[[], None], + timeout: float = 2.0, + interval: float = 0.01, +) -> None: + """Retry an action until an Event fires. + + Useful for async test paths where the producer may need to retry briefly + before the subscriber is fully ready. + """ + deadline = threading.Event() + timer = threading.Timer(timeout, deadline.set) + timer.start() + try: + while not event.is_set() and not deadline.is_set(): + action() + event.wait(interval) + finally: + timer.cancel() + assert event.is_set(), f"Timed out after {timeout}s waiting for event" diff --git a/dimos/core/test_zenoh_transport.py b/dimos/core/test_zenoh_transport.py new file mode 100644 index 0000000000..bb230ab6eb --- /dev/null +++ b/dimos/core/test_zenoh_transport.py @@ -0,0 +1,271 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Zenoh transport scaffold + +Tests the conditional logic added to support Zenoh alongside LCM: +- GlobalConfig transport field +- _get_transport_for() branching +- LCM configurator gating +""" + +from __future__ import annotations + +from typing import cast + +from pydantic import ValidationError +import pytest + +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.coordination.module_coordinator import _get_transport_for, _run_configurators +from dimos.core.global_config import GlobalConfig, global_config +from dimos.core.module import Module +from dimos.core.stream import In, Out +from dimos.core.test_utils import retry_until +from dimos.core.transport import ZENOH_AVAILABLE, LCMTransport, pLCMTransport +from dimos.msgs.sensor_msgs.Image import Image + + +class TypedMsg: + """A fake typed message with lcm_encode for testing.""" + + @staticmethod + def lcm_encode() -> bytes: + return b"" + + +class UntypedMsg: + """A message without lcm_encode — triggers pickle transport.""" + + pass + + +class ProducerModule(Module): + typed_data: Out[TypedMsg] + untyped_data: Out[UntypedMsg] + + +class ConsumerModule(Module): + typed_data: In[TypedMsg] + untyped_data: In[UntypedMsg] + + +class TestGlobalConfigTransportField: + def test_default_transport_is_lcm_on_linux(self, mocker) -> None: # type: ignore[no-untyped-def] + mocker.patch("dimos.core.global_config.platform.system", return_value="Linux") + mocker.patch("dimos.core.global_config.ZENOH_AVAILABLE", True) + + config = GlobalConfig() + assert config.transport == "lcm" + + def test_default_transport_is_zenoh_on_macos_when_available(self, mocker) -> None: # type: ignore[no-untyped-def] + mocker.patch("dimos.core.global_config.platform.system", return_value="Darwin") + mocker.patch("dimos.core.global_config.ZENOH_AVAILABLE", True) + + config = GlobalConfig() + assert config.transport == "zenoh" + + def test_default_transport_stays_lcm_on_macos_without_zenoh(self, mocker) -> None: # type: ignore[no-untyped-def] + mocker.patch("dimos.core.global_config.platform.system", return_value="Darwin") + mocker.patch("dimos.core.global_config.ZENOH_AVAILABLE", False) + + config = GlobalConfig() + assert config.transport == "lcm" + + def test_transport_can_be_set_to_zenoh(self) -> None: + config = GlobalConfig() + config.update(transport="zenoh") + assert config.transport == "zenoh" + + def test_invalid_transport_is_rejected_at_init(self) -> None: + with pytest.raises(ValidationError, match="transport"): + GlobalConfig(transport=cast("object", "invalid")) + + def test_invalid_transport_is_rejected_on_update(self) -> None: + config = GlobalConfig() + with pytest.raises(ValidationError, match="transport"): + config.update(transport=cast("object", "invalid")) + + +class TestZenohAvailableGuard: + def test_zenoh_available_is_bool(self) -> None: + assert isinstance(ZENOH_AVAILABLE, bool) + + @pytest.mark.skipif(not ZENOH_AVAILABLE, reason="zenoh not installed") + def test_zenoh_transport_classes_exist_when_available(self) -> None: + from dimos.core.transport import ZenohTransport, pZenohTransport + + assert ZenohTransport is not None + assert pZenohTransport is not None + + +class TestGetTransportForBranching: + """Test that _get_transport_for() returns the right transport type based on config.""" + + def _make_blueprint(self): # type: ignore[no-untyped-def] + return autoconnect(ProducerModule.blueprint(), ConsumerModule.blueprint()) + + def test_lcm_transport_returned_when_transport_is_lcm(self, mocker) -> None: + mocker.patch.object(global_config, "transport", "lcm") + bp = self._make_blueprint() + transport = _get_transport_for(bp, "typed_data", TypedMsg) + assert isinstance(transport, LCMTransport) + + def test_lcm_pickle_transport_returned_for_untyped_when_lcm(self, mocker) -> None: + mocker.patch.object(global_config, "transport", "lcm") + bp = self._make_blueprint() + transport = _get_transport_for(bp, "untyped_data", UntypedMsg) + assert isinstance(transport, pLCMTransport) + + @pytest.mark.skipif(not ZENOH_AVAILABLE, reason="zenoh not installed") + def test_zenoh_transport_returned_when_transport_is_zenoh(self, mocker) -> None: + from dimos.core.transport import ZenohTransport + + mocker.patch.object(global_config, "transport", "zenoh") + bp = self._make_blueprint() + transport = _get_transport_for(bp, "typed_data", TypedMsg) + assert isinstance(transport, ZenohTransport) + + @pytest.mark.skipif(not ZENOH_AVAILABLE, reason="zenoh not installed") + def test_zenoh_pickle_transport_returned_for_untyped_when_zenoh(self, mocker) -> None: + from dimos.core.transport import pZenohTransport + + mocker.patch.object(global_config, "transport", "zenoh") + bp = self._make_blueprint() + transport = _get_transport_for(bp, "untyped_data", UntypedMsg) + assert isinstance(transport, pZenohTransport) + + @pytest.mark.skipif(not ZENOH_AVAILABLE, reason="zenoh not installed") + def test_zenoh_topic_uses_dimos_prefix(self, mocker) -> None: + from dimos.core.transport import pZenohTransport + + mocker.patch.object(global_config, "transport", "zenoh") + bp = self._make_blueprint() + transport = _get_transport_for(bp, "untyped_data", UntypedMsg) + assert isinstance(transport, pZenohTransport) + assert "dimos/" in transport.topic + + def test_zenoh_raises_when_not_available(self, mocker) -> None: + mocker.patch.object(global_config, "transport", "zenoh") + mocker.patch("dimos.core.coordination.module_coordinator.ZENOH_AVAILABLE", False) + + bp = self._make_blueprint() + with pytest.raises(RuntimeError, match="eclipse-zenoh is not installed"): + _get_transport_for(bp, "typed_data", TypedMsg) + + +class TestConfiguratorGating: + def test_lcm_configurators_run_when_transport_is_lcm(self, mocker) -> None: + mocker.patch.object(global_config, "transport", "lcm") + mock_lcm_configs = mocker.patch( + "dimos.protocol.service.system_configurator.lcm_config.lcm_configurators", + return_value=[], + ) + mocker.patch("dimos.protocol.service.system_configurator.base.configure_system") + + bp = autoconnect(ProducerModule.blueprint(), ConsumerModule.blueprint()) + _run_configurators(bp) + + mock_lcm_configs.assert_called_once() + + def test_lcm_configurators_skipped_when_transport_is_zenoh(self, mocker) -> None: + mocker.patch.object(global_config, "transport", "zenoh") + mock_lcm_configs = mocker.patch( + "dimos.protocol.service.system_configurator.lcm_config.lcm_configurators", + return_value=[], + ) + mocker.patch("dimos.protocol.service.system_configurator.base.configure_system") + + bp = autoconnect(ProducerModule.blueprint(), ConsumerModule.blueprint()) + _run_configurators(bp) + + mock_lcm_configs.assert_not_called() + + +@pytest.mark.skipif(not ZENOH_AVAILABLE, reason="zenoh not installed") +class TestZenohTransportWrapper: + """Test ZenohTransport and pZenohTransport broadcast/subscribe lifecycle.""" + + @pytest.fixture(autouse=True) + def _clean_sessions(self): + from dimos.protocol.service.zenohservice import _sessions + + yield + for s in _sessions.values(): + s.close() + _sessions.clear() + + def test_zenoh_transport_broadcast_and_subscribe(self) -> None: + import threading + + import numpy as np + + from dimos.core.transport import ZenohTransport + + t = ZenohTransport("dimos/test/transport", Image) + t.start() + + received = [] + event = threading.Event() + + def cb(msg): # type: ignore[no-untyped-def] + received.append(msg) + event.set() + + t.subscribe(cb) + test_img = Image(np.zeros((2, 2, 3), dtype=np.uint8)) + retry_until(event, lambda: t.broadcast(None, test_img)) + assert isinstance(received[0], Image) + t.stop() + + def test_pzenoh_transport_broadcast_and_subscribe(self) -> None: + import threading + + from dimos.core.transport import pZenohTransport + + t = pZenohTransport("dimos/test/pickle_transport") + t.start() + + received = [] + event = threading.Event() + + def cb(msg): # type: ignore[no-untyped-def] + received.append(msg) + event.set() + + t.subscribe(cb) + retry_until(event, lambda: t.broadcast(None, {"key": "value"})) + assert received[0] == {"key": "value"} + t.stop() + + def test_auto_start_on_broadcast(self) -> None: + from dimos.core.transport import pZenohTransport + + t = pZenohTransport("dimos/test/autostart") + # Don't call start() — broadcast should auto-start + t.broadcast(None, "test") + assert t._started + t.stop() + + def test_stop_and_restart(self) -> None: + from dimos.core.transport import pZenohTransport + + t = pZenohTransport("dimos/test/restart") + t.start() + assert t._started + t.stop() + assert not t._started + t.start() + assert t._started + t.stop() diff --git a/dimos/core/transport.py b/dimos/core/transport.py index c6a0129f20..c8579dd926 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -19,6 +19,7 @@ TYPE_CHECKING, Any, TypeVar, + cast, ) from dimos.core.stream import In, Out, Stream, Transport @@ -32,6 +33,13 @@ except ImportError: DDS_AVAILABLE = False +try: + import zenoh as _zenoh # noqa: F401 + + ZENOH_AVAILABLE = True +except ImportError: + ZENOH_AVAILABLE = False + from dimos.protocol.pubsub.impl.lcmpubsub import LCM, PickleLCM, Topic as LCMTopic from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory @@ -327,4 +335,89 @@ def subscribe( return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) -class ZenohTransport(PubSubTransport[T]): ... +if ZENOH_AVAILABLE: + from dimos.protocol.pubsub.impl.zenohpubsub import ( + PickleZenoh, + Topic as ZenohTopic, + Zenoh, + ) + + class ZenohTransport(PubSubTransport[T]): + """Zenoh transport with LCM encoding for typed DimosMsg.""" + + _started: bool = False + + def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(LCMTopic(topic, type)) + self.zenoh = Zenoh(**kwargs) + self._start_lock = threading.RLock() + + def __reduce__(self): # type: ignore[no-untyped-def] + return (ZenohTransport, (self.topic.topic, self.topic.lcm_type)) + + def start(self) -> None: + with self._start_lock: + if not self._started: + self.zenoh.start() + self._started = True + + def stop(self) -> None: + with self._start_lock: + if self._started: + self.zenoh.stop() + self._started = False + + def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] + with self._start_lock: + if not self._started: + self.start() + self.zenoh.publish(self.topic, msg) + + def subscribe( + self, callback: Callable[[T], None], selfstream: Stream[T] | None = None + ) -> Callable[[], None]: + with self._start_lock: + if not self._started: + self.start() + return self.zenoh.subscribe(self.topic, lambda msg, topic: callback(cast("T", msg))) + + class pZenohTransport(PubSubTransport[T]): + """Zenoh transport with pickle encoding for arbitrary Python objects.""" + + _started: bool = False + + def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(topic) + self.zenoh = PickleZenoh(**kwargs) + self._start_lock = threading.RLock() + + def __reduce__(self): # type: ignore[no-untyped-def] + return (pZenohTransport, (self.topic,)) + + def start(self) -> None: + with self._start_lock: + if not self._started: + self.zenoh.start() + self._started = True + + def stop(self) -> None: + with self._start_lock: + if self._started: + self.zenoh.stop() + self._started = False + + def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] + with self._start_lock: + if not self._started: + self.start() + self.zenoh.publish(ZenohTopic(self.topic), msg) + + def subscribe( + self, callback: Callable[[T], None], selfstream: Stream[T] | None = None + ) -> Callable[[], None]: + with self._start_lock: + if not self._started: + self.start() + return self.zenoh.subscribe( + ZenohTopic(self.topic), lambda msg, topic: callback(msg) + ) diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py index a5c59dc00e..d75a0e0781 100644 --- a/dimos/protocol/pubsub/benchmark/testdata.py +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -272,6 +272,33 @@ def redis_msggen(size: int) -> tuple[str, Any]: print("Redis not available") +from dimos.core.transport import ZENOH_AVAILABLE + +if ZENOH_AVAILABLE: + from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, Zenoh + from dimos.protocol.service.zenohservice import _sessions as _zenoh_sessions + + @contextmanager + def zenoh_pubsub_channel() -> Generator[Zenoh, None, None]: + zenoh_pubsub = Zenoh() + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + for s in _zenoh_sessions.values(): + s.close() # type: ignore[no-untyped-call] + _zenoh_sessions.clear() + + def zenoh_msggen(size: int) -> tuple[ZenohTopic, Image]: + return (ZenohTopic("dimos/benchmark/zenoh", Image), make_data_image(size)) + + testcases.append( + Case( + pubsub_context=zenoh_pubsub_channel, + msg_gen=zenoh_msggen, + ) + ) + + from dimos.protocol.pubsub.impl.rospubsub import ( ROS_AVAILABLE, DimosROS, diff --git a/dimos/protocol/pubsub/impl/test_zenohpubsub.py b/dimos/protocol/pubsub/impl/test_zenohpubsub.py new file mode 100644 index 0000000000..364242ebbb --- /dev/null +++ b/dimos/protocol/pubsub/impl/test_zenohpubsub.py @@ -0,0 +1,211 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for ZenohPubSubBase — raw bytes pub/sub over Zenoh.""" + +from __future__ import annotations + +import threading + +import pytest + +pytest.importorskip("zenoh") + +from dimos.core.test_utils import retry_until +from dimos.protocol.pubsub.impl.lcmpubsub import Topic +from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSubBase +from dimos.protocol.service.zenohservice import _sessions + + +@pytest.fixture() +def pubsub(): + """Create and start a ZenohPubSubBase instance, clean up after.""" + # Each test gets a fresh session to avoid thread leak detection + for session in _sessions.values(): + session.close() + _sessions.clear() + + ps = ZenohPubSubBase() + ps.start() + yield ps + ps.stop() + # Close sessions so Zenoh's internal threads are joined + for session in _sessions.values(): + session.close() + _sessions.clear() + + +class TestZenohPubSubBase: + def test_publish_and_subscribe(self, pubsub) -> None: + received = [] + event = threading.Event() + topic = Topic("dimos/test/basic") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"hello zenoh")) + assert received[0] == b"hello zenoh" + + def test_multiple_subscribers(self, pubsub) -> None: + received_a: list[bytes] = [] + received_b: list[bytes] = [] + both_received = threading.Event() + countdown = threading.Barrier(2, action=both_received.set) + topic = Topic("dimos/test/multi") + + def callback_a(msg: bytes, t: Topic) -> None: + received_a.append(msg) + countdown.wait() + + def callback_b(msg: bytes, t: Topic) -> None: + received_b.append(msg) + countdown.wait() + + pubsub.subscribe(topic, callback_a) + pubsub.subscribe(topic, callback_b) + retry_until(both_received, lambda: pubsub.publish(topic, b"broadcast")) + assert received_a[-1:] == [b"broadcast"] + assert received_b[-1:] == [b"broadcast"] + + def test_unsubscribe(self, pubsub) -> None: + received: list[bytes] = [] + event = threading.Event() + topic = Topic("dimos/test/unsub") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + unsub = pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"before")) + assert received == [b"before"] + + # Unsubscribe and verify no more messages arrive + unsub() + received.clear() + event.clear() + pubsub.publish(topic, b"after") + + # We can't prove a negative with an event, so use a short timeout + assert not event.wait(timeout=0.2), "Received message after unsubscribe" + assert received == [] + + def test_unsubscribe_is_idempotent(self, pubsub) -> None: + topic = Topic("dimos/test/idempotent") + unsub = pubsub.subscribe(topic, lambda msg, t: None) + unsub() + unsub() # should not raise + + def test_publish_before_subscriber_does_not_error(self, pubsub) -> None: + topic = Topic("dimos/test/no_sub") + pubsub.publish(topic, b"orphan message") # should not raise + + def test_stop_cleans_up_publishers_and_subscribers(self, pubsub) -> None: + topic = Topic("dimos/test/cleanup") + pubsub.subscribe(topic, lambda msg, t: None) + pubsub.publish(topic, b"test") + pubsub.stop() + assert len(pubsub._publishers) == 0 + assert len(pubsub._subscribers) == 0 + + def test_subscribe_all(self, pubsub) -> None: + received: list[bytes] = [] + event = threading.Event() + topic = Topic("dimos/test/any/topic") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + pubsub.subscribe_all(callback) + retry_until(event, lambda: pubsub.publish(topic, b"wildcard")) + assert received[-1] == b"wildcard" + + +class TestTopicKeyExprConversion: + """Tests for _topic_to_key_expr and _key_expr_to_topic round-trip.""" + + def test_typed_topic_to_key_expr(self) -> None: + from dimos.msgs.geometry_msgs.Twist import Twist + from dimos.protocol.pubsub.impl.zenohpubsub import _topic_to_key_expr + + topic = Topic("dimos/cmd_vel", lcm_type=Twist) + key = _topic_to_key_expr(topic) + assert key == "dimos/cmd_vel/geometry_msgs.Twist" + + def test_untyped_topic_to_key_expr(self) -> None: + from dimos.protocol.pubsub.impl.zenohpubsub import _topic_to_key_expr + + topic = Topic("dimos/data") + key = _topic_to_key_expr(topic) + assert key == "dimos/data" + + def test_key_expr_to_topic_with_known_type(self) -> None: + from dimos.msgs.geometry_msgs.Twist import Twist + from dimos.protocol.pubsub.impl.zenohpubsub import _key_expr_to_topic + + topic = _key_expr_to_topic("dimos/cmd_vel/geometry_msgs.Twist") + assert topic.topic == "dimos/cmd_vel" + assert topic.lcm_type is Twist + + def test_key_expr_to_topic_with_unknown_type(self) -> None: + from dimos.protocol.pubsub.impl.zenohpubsub import _key_expr_to_topic + + topic = _key_expr_to_topic("dimos/data/unknown.FooBar") + # Last segment doesn't resolve — entire string becomes the topic + assert topic.topic == "dimos/data/unknown.FooBar" + assert topic.lcm_type is None + + def test_key_expr_to_topic_with_no_slash(self) -> None: + from dimos.protocol.pubsub.impl.zenohpubsub import _key_expr_to_topic + + topic = _key_expr_to_topic("simple_topic") + assert topic.topic == "simple_topic" + assert topic.lcm_type is None + + def test_key_expr_to_topic_uses_default_type(self) -> None: + from dimos.msgs.geometry_msgs.Twist import Twist + from dimos.protocol.pubsub.impl.zenohpubsub import _key_expr_to_topic + + topic = _key_expr_to_topic("dimos/data", default_lcm_type=Twist) + assert topic.topic == "dimos/data" + assert topic.lcm_type is Twist + + def test_round_trip_typed(self) -> None: + from dimos.msgs.sensor_msgs.Image import Image + from dimos.protocol.pubsub.impl.zenohpubsub import ( + _key_expr_to_topic, + _topic_to_key_expr, + ) + + original = Topic("dimos/color_image", lcm_type=Image) + key = _topic_to_key_expr(original) + reconstructed = _key_expr_to_topic(key) + assert reconstructed.topic == original.topic + assert reconstructed.lcm_type is original.lcm_type + + def test_round_trip_untyped(self) -> None: + from dimos.protocol.pubsub.impl.zenohpubsub import ( + _key_expr_to_topic, + _topic_to_key_expr, + ) + + original = Topic("dimos/gps_location") + key = _topic_to_key_expr(original) + reconstructed = _key_expr_to_topic(key) + assert reconstructed.topic == original.topic + assert reconstructed.lcm_type is None diff --git a/dimos/protocol/pubsub/impl/zenohpubsub.py b/dimos/protocol/pubsub/impl/zenohpubsub.py new file mode 100644 index 0000000000..250b2b5818 --- /dev/null +++ b/dimos/protocol/pubsub/impl/zenohpubsub.py @@ -0,0 +1,205 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zenoh PubSub implementation. + +Provides raw bytes pub/sub over Zenoh (ZenohPubSubBase) and +encoder-composed variants (Zenoh, PickleZenoh). +""" + +from __future__ import annotations + +from collections.abc import Callable +import threading +from typing import TYPE_CHECKING, Any + +from dimos.protocol.pubsub.encoders import LCMEncoderMixin, PickleEncoderMixin +from dimos.protocol.pubsub.impl.lcmpubsub import Topic +from dimos.protocol.pubsub.spec import AllPubSub +from dimos.protocol.service.zenohservice import ZenohService +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + import zenoh + +logger = setup_logger() + + +def _topic_to_key_expr(topic: Topic) -> str: + """Convert a Topic to a Zenoh key expression. + + Embeds the lcm_type in the key using '/' instead of '#' (which is + forbidden in Zenoh key expressions). This mirrors how LCM channels + carry type info in the channel name for subscribe_all decoding. + + Examples: + Topic("dimos/cmd_vel", Twist) → "dimos/cmd_vel/geometry_msgs.Twist" + Topic("dimos/data") → "dimos/data" + + Known limitation: the type name becomes a path segment. If a topic + name itself looks like a type name (e.g., "dimos/geometry_msgs.Twist"), + _key_expr_to_topic may misparse it on the receiving end. In practice + this doesn't happen because topic names come from stream names (e.g., + "cmd_vel", "lidar"), not from type names. + """ + base = topic.topic if isinstance(topic.topic, str) else topic.pattern + if topic.lcm_type is not None: + return f"{base}/{topic.lcm_type.msg_name}" + return base + + +def _key_expr_to_topic(key_expr: str, default_lcm_type: type | None = None) -> Topic: + """Reconstruct a Topic from a Zenoh key expression. + + Parses the last '/' segment and attempts to resolve it as a DimosMsg + type via resolve_msg_type(). If resolution succeeds, the segment is + treated as the type suffix and the remainder as the base topic. + + Examples: + "dimos/cmd_vel/geometry_msgs.Twist" → Topic("dimos/cmd_vel", Twist) + "dimos/data" → Topic("dimos/data", default_lcm_type) + "dimos/data/unknown.Foo" → Topic("dimos/data/unknown.Foo", default_lcm_type) + + Known limitation: if a topic's base path ends with a segment that + happens to match a registered DimosMsg type name, this function will + incorrectly split it. See _topic_to_key_expr docstring for details. + """ + from dimos.msgs.helpers import resolve_msg_type + + # Try to resolve the last segment as a message type + parts = key_expr.rsplit("/", 1) + if len(parts) == 2: + base, maybe_type = parts + lcm_type = resolve_msg_type(maybe_type) + if lcm_type is not None: + return Topic(topic=base, lcm_type=lcm_type) + return Topic(topic=key_expr, lcm_type=default_lcm_type) + + +class ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes]): + """Raw bytes pub/sub over Zenoh. + + Publishers are cached per-topic to avoid re-declaring on every publish. + Subscribers are tracked for cleanup on stop(). + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._publishers: dict[str, zenoh.Publisher] = {} + self._publisher_lock = threading.Lock() + self._subscribers: list[zenoh.Subscriber[Any]] = [] + self._subscriber_lock = threading.Lock() + + def _get_publisher(self, key_expr: str) -> zenoh.Publisher: + """Get or create a Publisher for the given key expression.""" + with self._publisher_lock: + if key_expr not in self._publishers: + self._publishers[key_expr] = self.session.declare_publisher(key_expr) + return self._publishers[key_expr] + + def publish(self, topic: Topic, message: bytes) -> None: + """Publish bytes to a Zenoh key expression. + + Transport-level errors (session closed, invalid key expression) are + logged but not raised. Delivery guarantees are handled by Zenoh's + reliability protocol (RELIABLE mode retransmits at each hop) — these + do not surface as exceptions from put(). + """ + key_expr = _topic_to_key_expr(topic) + try: + publisher = self._get_publisher(key_expr) + publisher.put(message) + except Exception: + logger.error(f"Error publishing to {key_expr}", exc_info=True) + + def subscribe( + self, topic: Topic, callback: Callable[[bytes, Topic], None] + ) -> Callable[[], None]: + """Subscribe to a Zenoh key expression. + + Returns an unsubscribe callable. + """ + key_expr = _topic_to_key_expr(topic) + + def on_sample(sample: zenoh.Sample) -> None: + try: + data = sample.payload.to_bytes() + except Exception: + logger.error(f"Error reading payload from {key_expr}", exc_info=True) + return + # Reconstruct topic with type info from the key expression + # (needed for subscribe_all where the subscription topic has no lcm_type) + recv_topic = _key_expr_to_topic(str(sample.key_expr), topic.lcm_type) + callback(data, recv_topic) + + sub = self.session.declare_subscriber(key_expr, on_sample) + with self._subscriber_lock: + self._subscribers.append(sub) + + undeclared = False + + def unsubscribe() -> None: + nonlocal undeclared + if undeclared: + return + undeclared = True + with self._subscriber_lock: + if sub not in self._subscribers: + return # Already removed by stop() — stop() owns the undeclare + self._subscribers.remove(sub) + sub.undeclare() # type: ignore[no-untyped-call] + + return unsubscribe + + def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]: + """Subscribe to all dimos key expressions via wildcard.""" + return self.subscribe(Topic("dimos/**"), callback) + + def stop(self) -> None: + """Clean up publishers and subscribers.""" + with self._subscriber_lock: + for subscriber in self._subscribers: + subscriber.undeclare() # type: ignore[no-untyped-call] + self._subscribers.clear() + with self._publisher_lock: + for publisher in self._publishers.values(): + publisher.undeclare() # type: ignore[no-untyped-call] + self._publishers.clear() + super().stop() + + +class Zenoh( # type: ignore[misc] + LCMEncoderMixin, # type: ignore[type-arg] + ZenohPubSubBase, +): + """Zenoh pub/sub with LCM encoding for typed DimosMsg.""" + + ... + + +class PickleZenoh( # type: ignore[misc] + PickleEncoderMixin, # type: ignore[type-arg] + ZenohPubSubBase, +): + """Zenoh pub/sub with pickle encoding for arbitrary Python objects.""" + + ... + + +__all__ = [ + "PickleZenoh", + "Topic", + "Zenoh", + "ZenohPubSubBase", +] diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index 0907e662d5..ef0a5538a9 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -145,6 +145,49 @@ def shared_memory_cpu_context() -> Generator[PickleSharedMemory, None, None]: ) +from dimos.core.transport import ZENOH_AVAILABLE + +if ZENOH_AVAILABLE: + from dimos.protocol.pubsub.impl.zenohpubsub import PickleZenoh, Zenoh + from dimos.protocol.service.zenohservice import _sessions as _zenoh_sessions + + @contextmanager + def zenoh_lcm_context() -> Generator[Zenoh, None, None]: + zenoh_pubsub = Zenoh() + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + for s in _zenoh_sessions.values(): + s.close() + _zenoh_sessions.clear() + + testdata.append( + ( + zenoh_lcm_context, + Topic(topic="dimos/test/spec", lcm_type=Vector3), + [Vector3(1, 2, 3), Vector3(4, 5, 6), Vector3(7, 8, 9)], + ) + ) + + @contextmanager + def zenoh_pickle_context() -> Generator[PickleZenoh, None, None]: + zenoh_pubsub = PickleZenoh() + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + for s in _zenoh_sessions.values(): + s.close() + _zenoh_sessions.clear() + + testdata.append( + ( + zenoh_pickle_context, + Topic("dimos/test/spec/pickle"), + [{"key": "value1"}, {"key": "value2"}, {"key": "value3"}], + ) + ) + + @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None: with pubsub_context() as x: diff --git a/dimos/protocol/service/test_zenohservice.py b/dimos/protocol/service/test_zenohservice.py new file mode 100644 index 0000000000..34852d9292 --- /dev/null +++ b/dimos/protocol/service/test_zenohservice.py @@ -0,0 +1,84 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for ZenohService — singleton session management.""" + +from __future__ import annotations + +import pytest + +pytest.importorskip("zenoh") + +from dimos.protocol.service.zenohservice import ZenohConfig, ZenohService, _sessions + + +@pytest.fixture(autouse=True) +def _clear_sessions(): + """Clear the global session cache before each test.""" + yield + # Close and remove all sessions after each test + for session in _sessions.values(): + session.close() + _sessions.clear() + + +class TestZenohConfig: + def test_default_mode_is_peer(self) -> None: + config = ZenohConfig() + assert config.mode == "peer" + + def test_session_key_is_stable(self) -> None: + config = ZenohConfig() + assert config.session_key == config.session_key + + def test_different_modes_produce_different_keys(self) -> None: + peer = ZenohConfig(mode="peer") + client = ZenohConfig(mode="client") + assert peer.session_key != client.session_key + + +class TestZenohService: + def test_start_creates_session(self) -> None: + svc = ZenohService() + svc.start() + assert svc.session is not None + + def test_two_services_share_session(self) -> None: + svc1 = ZenohService() + svc2 = ZenohService() + svc1.start() + svc2.start() + assert svc1.session is svc2.session + + def test_stop_does_not_close_shared_session(self) -> None: + svc1 = ZenohService() + svc2 = ZenohService() + svc1.start() + svc2.start() + svc1.stop() + # svc2's session should still be valid + assert svc2.session is not None + + def test_session_before_start_raises(self) -> None: + svc = ZenohService() + with pytest.raises(RuntimeError, match="not initialized"): + _ = svc.session + + def test_start_is_idempotent(self) -> None: + svc = ZenohService() + svc.start() + session1 = svc.session + svc.start() + session2 = svc.session + assert session1 is session2 diff --git a/dimos/protocol/service/zenohservice.py b/dimos/protocol/service/zenohservice.py new file mode 100644 index 0000000000..2687a2e1d5 --- /dev/null +++ b/dimos/protocol/service/zenohservice.py @@ -0,0 +1,86 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zenoh session management — singleton pattern following DDSService.""" + +from __future__ import annotations + +import json +import threading +from typing import Any + +import zenoh + +from dimos.protocol.service.spec import BaseConfig, Service + +zenoh.init_log_from_env_or("warn") +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +_sessions: dict[str, zenoh.Session] = {} +_sessions_lock = threading.Lock() + + +class ZenohConfig(BaseConfig): + """Configuration for Zenoh service.""" + + mode: str = "peer" + connect: list[str] = [] + listen: list[str] = [] + + @property + def session_key(self) -> str: + """Produce a hashable key for singleton session lookup.""" + return f"{self.mode}|{json.dumps(sorted(self.connect))}|{json.dumps(sorted(self.listen))}" + + +class ZenohService(Service): + config: ZenohConfig + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + def start(self) -> None: + """Start the Zenoh service — opens a session if one doesn't exist for this config.""" + key = self.config.session_key + with _sessions_lock: + if key not in _sessions: + config = zenoh.Config() + config.insert_json5("mode", json.dumps(self.config.mode)) + if self.config.connect: + config.insert_json5("connect/endpoints", json.dumps(self.config.connect)) + if self.config.listen: + config.insert_json5("listen/endpoints", json.dumps(self.config.listen)) + _sessions[key] = zenoh.open(config) + logger.debug(f"Zenoh session opened in {self.config.mode} mode") + super().start() + + def stop(self) -> None: + """Stop the Zenoh service — does NOT close the shared session.""" + super().stop() + + @property + def session(self) -> zenoh.Session: + """Get the Zenoh Session instance for this service's config.""" + key = self.config.session_key + if key not in _sessions: + raise RuntimeError("Zenoh session not initialized — call start() first") + return _sessions[key] + + +__all__ = [ + "ZenohConfig", + "ZenohService", +] diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index f4a7e6f226..1955174b22 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -40,6 +40,7 @@ import typer from dimos.core.core import rpc +from dimos.core.global_config import global_config from dimos.core.module import Module, ModuleConfig from dimos.msgs.sensor_msgs.PointCloud2 import register_colormap_annotation from dimos.protocol.pubsub.impl.lcmpubsub import LCM @@ -172,8 +173,59 @@ def _resolve_viewer_mode() -> ViewerMode: return _BACKEND_TO_MODE.get(global_config.viewer, "native") +def _default_pubsubs(config: Any = None) -> list[SubscribeAllCapable[Any, Any]]: + """Select the pubsub backend based on the active transport. + + When transport is Zenoh, we listen on BOTH Zenoh and LCM because + TF (transform frames) is currently hardcoded to LCM in the Module + base class. Without LCM, the robot pose won't update in the viewer. + + If transport is zenoh but eclipse-zenoh is not installed, raises + ``RuntimeError`` (same message as stream transport selection) instead of + falling back to LCM only. + """ + transport = getattr(config, "transport", None) or global_config.transport + if transport == "zenoh": + from dimos.core.transport import ZENOH_AVAILABLE + + if not ZENOH_AVAILABLE: + raise RuntimeError( + "transport='zenoh' but eclipse-zenoh is not installed. " + "Install with: uv sync --extra zenoh" + ) + from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh + + return [Zenoh(), LCM()] + return [LCM()] + + +def _resolve_pubsubs(config: Any) -> list[SubscribeAllCapable[Any, Any]]: + """Return explicit pubsubs when truly overridden, else transport defaults. + + Older blueprints commonly passed ``pubsubs=[LCM()]`` as the effective + default. Preserve the newer transport-driven behavior for that legacy + value, but honor explicit non-default overrides such as custom backends. + """ + fields_set: set[str] = cast("set[str]", getattr(config, "model_fields_set", set())) + pubsubs = cast( + "list[SubscribeAllCapable[Any, Any]] | None", + getattr(config, "pubsubs", None), + ) + if "pubsubs" in fields_set and pubsubs is not None: + is_legacy_default = len(pubsubs) == 1 and isinstance(pubsubs[0], LCM) + if not is_legacy_default: + return pubsubs + return _default_pubsubs(getattr(config, "g", config)) + + class Config(ModuleConfig): - """Configuration for RerunBridgeModule.""" + """Configuration for RerunBridgeModule. + + The pubsubs field is accepted for backwards compatibility. The legacy + ``[LCM()]`` value is treated as the old default and replaced by the + transport-driven runtime default. Explicit non-default overrides are still + honored. + """ pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()]) @@ -259,8 +311,15 @@ def _get_entity_path(self, topic: Any) -> str: # Default: use topic.name if available (LCM Topic), else str topic_str = getattr(topic, "name", None) or str(topic) - # Strip everything after # (LCM topic suffix) + # Strip type suffix: LCM uses '#type', Zenoh embeds type as '/type' in key expr + # but _key_expr_to_topic already parsed it into topic.topic, so use that. + raw = getattr(topic, "topic", topic_str) + if isinstance(raw, str): + topic_str = raw topic_str = topic_str.split("#")[0] + # Strip Zenoh key prefix (dimos/) to match LCM entity paths + if topic_str.startswith("dimos/"): + topic_str = "/" + topic_str.removeprefix("dimos/") return f"{self.config.entity_prefix}{topic_str}" def _on_message(self, msg: Any, topic: Any) -> None: @@ -333,8 +392,13 @@ def start(self) -> None: # Register colormap for viewer-side color resolution (PointCloud2 class_ids) register_colormap_annotation("turbo") + # Resolve pubsubs lazily — the module-level global_config singleton in worker + # processes doesn't have CLI overrides. Use self.config.g which is the parent's + # updated config, passed via the worker kwargs. + pubsubs = _resolve_pubsubs(self.config) + # Start pubsubs and subscribe to all messages - for pubsub in self.config.pubsubs: + for pubsub in pubsubs: logger.info(f"bridge listening on {pubsub.__class__.__name__}") if hasattr(pubsub, "start"): pubsub.start() @@ -342,7 +406,7 @@ def start(self) -> None: self.register_disposable(Disposable(unsub)) # Add pubsub stop as disposable - for pubsub in self.config.pubsubs: + for pubsub in pubsubs: if hasattr(pubsub, "stop"): self.register_disposable(Disposable(pubsub.stop)) # type: ignore[union-attr] @@ -441,9 +505,6 @@ def run_bridge( bridge = RerunBridgeModule( viewer_mode=viewer_mode, memory_limit=memory_limit, - # any pubsub that supports subscribe_all and topic that supports str(topic) - # is acceptable here - pubsubs=[LCM()], ) bridge.start() diff --git a/dimos/visualization/rerun/test_viewer_integration.py b/dimos/visualization/rerun/test_viewer_integration.py index edcd9e946a..dc38a1ba38 100644 --- a/dimos/visualization/rerun/test_viewer_integration.py +++ b/dimos/visualization/rerun/test_viewer_integration.py @@ -29,6 +29,8 @@ import re import shutil +from dimos.protocol.pubsub.impl.lcmpubsub import LCM + class TestViewerBinaryInstallation: """Verify dimos-viewer binary is installed and functional.""" @@ -120,6 +122,33 @@ def test_bridge_has_fallback(self): ) +class ExplicitPubSubOverride: + def subscribe_all(self, callback): # type: ignore[no-untyped-def] + return lambda: None + + +class TestBridgePubsubResolution: + def test_legacy_lcm_pubsubs_defers_to_transport_default(self): + from dimos.core.global_config import GlobalConfig + from dimos.visualization.rerun.bridge import Config, _resolve_pubsubs + + config = Config(pubsubs=[LCM()], g=GlobalConfig(transport="lcm")) + pubsubs = _resolve_pubsubs(config) + + assert len(pubsubs) == 1 + assert isinstance(pubsubs[0], LCM) + + def test_explicit_custom_pubsubs_override_is_honored(self): + from dimos.core.global_config import GlobalConfig + from dimos.visualization.rerun.bridge import Config, _resolve_pubsubs + + custom = ExplicitPubSubOverride() + config = Config(pubsubs=[custom], g=GlobalConfig(transport="lcm")) + pubsubs = _resolve_pubsubs(config) + + assert pubsubs == [custom] + + def _parse_version(version_str: str) -> tuple[int, int]: """Extract (major, minor) from a version string like '0.29.2' or '0.30.0a2'.""" match = re.match(r"(\d+)\.(\d+)", version_str) diff --git a/docs/development/testing.md b/docs/development/testing.md index c8a226b7ad..0618806c78 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -3,7 +3,7 @@ For development, you should install all dependencies so that tests have access to them. ```bash -uv sync --all-extras --no-extra dds +uv sync --all-extras --no-extra dds --no-extra cuda --frozen ``` ## Types of tests @@ -37,7 +37,7 @@ For the purposes of DimOS, slow tests are marked with `@pytest.mark.slow` and fa Run the fast tests: ```bash -./bin/pytest-fast +CI=1 ./bin/pytest-fast ``` This is the same as: @@ -48,16 +48,20 @@ pytest dimos The default `addopts` in `pyproject.toml` includes a `-m` filter that excludes the `slow`/`mujoco`/`tool`. So plain `pytest dimos` only runs fast tests. +For non-interactive runs, especially on macOS, prefer `CI=1`. This skips system configurator prompts and avoids test runs trying to change local multicast or sysctl state. + ### Slow tests Run the slow tests: ```bash -./bin/pytest-slow +CI=1 ./bin/pytest-slow ``` (This is just a shortcut for `pytest -m 'not (tool or mujoco)' dimos`. I.e., run both fast tests and slow tests, but not `tool` or `mujoco`.) +This includes slow agent and MCP-style integration tests in addition to slower transport and module tests. If one of those paths is broken, a failure can take close to a minute to surface because the harness waits for the agent flow to finish before timing out. + When writing or debugging a specific slow test, override `-m` yourself to run it: ```bash diff --git a/docs/installation/osx.md b/docs/installation/osx.md index 016e32ed5b..73014b0fcc 100644 --- a/docs/installation/osx.md +++ b/docs/installation/osx.md @@ -39,3 +39,19 @@ uv run mypy dimos # tests (around a minute to run) uv run pytest dimos ``` + +## Transport note for macOS + +LCM over UDP can be unreliable on macOS for large or high-rate replay workloads. If you are running heavy replay traffic, prefer Zenoh: + +```sh +dimos --transport=zenoh --dtop --replay --replay-dir=unitree_go2_bigoffice run unitree-go2 +``` + +If you are developing on the repository, prefer syncing the full environment with the checked-in lockfile: + +```sh +uv sync --all-extras --no-extra dds --no-extra cuda --frozen +``` + +Do not rely on `uv sync --extra zenoh` in an existing full development environment. That can re-resolve the environment in a way that removes unrelated packages you already had installed. diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 7a25ee4ae3..6746208b20 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -23,6 +23,7 @@ dimos [GLOBAL OPTIONS] COMMAND [ARGS] | `--memory-limit` | TEXT | `auto` | Rerun viewer memory limit | | `--mcp-port` | INT | `9990` | MCP server port | | `--mcp-host` | TEXT | `127.0.0.1` | MCP server bind address | +| `--transport` | `lcm\|zenoh` | platform-dependent | Stream transport backend. Defaults to `zenoh` on macOS when Zenoh is installed, otherwise `lcm`. | | `--dtop` / `--no-dtop` | bool | `False` | Enable live resource monitor overlay | | `--obstacle-avoidance` / `--no-obstacle-avoidance` | bool | `True` | Enable obstacle avoidance | | `--detection-model` | `qwen\|moondream` | `moondream` | Vision model for object detection | @@ -81,6 +82,9 @@ dimos run unitree-go2-agentic --daemon # Replay with Rerun viewer dimos --replay --viewer rerun run unitree-go2 +# Replay Big Office data explicitly over Zenoh +dimos --transport=zenoh --dtop --replay --replay-dir=unitree_go2_bigoffice run unitree-go2 + # Real robot dimos run unitree-go2-agentic --robot-ip 192.168.123.161 @@ -91,6 +95,8 @@ dimos run unitree-go2 keyboard-teleop dimos run unitree-go2-agentic --disable OsmSkill WebInput ``` +On macOS, heavy replay workloads can be unreliable over LCM UDP. If Zenoh is installed, the default transport resolves to `zenoh`; you can still force either path explicitly with `--transport=lcm` or `--transport=zenoh`. + When `--daemon` is used, the process: 1. Builds and starts all modules (foreground — you see errors) 2. Runs a health check (polls worker PIDs) diff --git a/docs/usage/transports/index.md b/docs/usage/transports/index.md index 09ccb484ed..5294a2e859 100644 --- a/docs/usage/transports/index.md +++ b/docs/usage/transports/index.md @@ -29,6 +29,25 @@ So: treat the API as uniform, but pick a backend whose semantics match the task. --- +## Choosing a backend + +For most users, the important choice is between `lcm`, `zenoh`, and shared memory overrides: + +* `lcm`: current legacy default on most platforms. Fast and simple, but UDP multicast is best-effort. +* `zenoh`: network transport with reliable delivery semantics and the same typed message model through `LCMEncoderMixin`. +* shared memory (`pSHMTransport`, etc.): best for large local streams on a single machine. + +At the CLI level, you can select the stream transport globally with: + +```bash +dimos --transport=lcm run unitree-go2 +dimos --transport=zenoh run unitree-go2 +``` + +On macOS, large replay workloads can be unreliable over LCM UDP. If Zenoh is installed, DimOS defaults the global stream transport to `zenoh`; otherwise it falls back to `lcm`. + +--- + ## Benchmarks Quick view on performance of our pubsub backends: @@ -265,6 +284,24 @@ lcm.stop() Received velocity: x=1.0, y=0.0, z=0.5 ``` +### Zenoh + +Zenoh provides network pubsub without relying on UDP multicast for the user-facing stream transport. In DimOS it carries the same typed messages by encoding them with `LCMEncoderMixin`, so existing `dimos.msgs.*` types still work. + +Use Zenoh when: + +* you want a transport that behaves better than UDP multicast on macOS +* you are replaying large or high-rate data and want a more reliable network path +* you want to keep the DimOS typed stream model while changing the transport backend + +At the stream level, the transport wrappers are `ZenohTransport` and `pZenohTransport`. At the CLI level, the usual entry point is: + +```bash +dimos --transport=zenoh --dtop --replay --replay-dir=unitree_go2_bigoffice run unitree-go2 +``` + +The Rerun bridge also follows the global transport. When `transport=zenoh`, the bridge listens on Zenoh and on LCM for TF data. + ### Shared memory (IPC) Shared memory is highest performance, but only works on the **same machine**. @@ -434,6 +471,7 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b | `Memory` | Testing only, single process | No | No | Minimal reference impl | | `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | | `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN | +| `Zenoh` | Reliable network stream transport | Yes | Yes | Recommended on macOS for heavy replay | | `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | | `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | | `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP | diff --git a/pyproject.toml b/pyproject.toml index fa35dd79de..69e3c17280 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -311,6 +311,10 @@ dds = [ "cyclonedds>=0.10.5", ] +zenoh = [ + "eclipse-zenoh>=1.0.0,<2.0", +] + # Minimal dependencies for Docker modules that communicate with the DimOS host docker = [ "dimos-lcm", diff --git a/uv.lock b/uv.lock index aebf6f9055..7e08392e39 100644 --- a/uv.lock +++ b/uv.lock @@ -1978,6 +1978,9 @@ web = [ { name = "sse-starlette" }, { name = "uvicorn" }, ] +zenoh = [ + { name = "eclipse-zenoh" }, +] [package.metadata] requires-dist = [ @@ -2001,6 +2004,7 @@ requires-dist = [ { name = "dimos-viewer", marker = "extra == 'visualization'", specifier = ">=0.30.0a4" }, { name = "drake", marker = "platform_machine != 'aarch64' and sys_platform == 'darwin' and extra == 'manipulation'", specifier = "==1.45.0" }, { name = "drake", marker = "platform_machine != 'aarch64' and sys_platform != 'darwin' and extra == 'manipulation'", specifier = ">=1.40.0" }, + { name = "eclipse-zenoh", marker = "extra == 'zenoh'", specifier = ">=1.0.0,<2.0" }, { name = "edgetam-dimos", marker = "extra == 'misc'" }, { name = "einops", marker = "extra == 'misc'", specifier = "==0.8.1" }, { name = "empy", marker = "extra == 'misc'", specifier = "==3.3.4" }, @@ -2148,7 +2152,7 @@ requires-dist = [ { name = "xformers", marker = "platform_machine == 'x86_64' and extra == 'cuda'", specifier = ">=0.0.20" }, { name = "yapf", marker = "extra == 'misc'", specifier = "==0.40.2" }, ] -provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "manipulation", "cpu", "cuda", "dev", "psql", "sim", "drone", "dds", "docker", "base"] +provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "manipulation", "cpu", "cuda", "dev", "psql", "sim", "drone", "dds", "zenoh", "docker", "base"] [[package]] name = "dimos-lcm" @@ -2304,6 +2308,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "eclipse-zenoh" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d9/42/c8502d0e77f74b9cf4c192a01e620b3d15273d371464485796807d202d9d/eclipse_zenoh-1.9.0.tar.gz", hash = "sha256:b0477ab431132ebfe1096eccac13ea0066d50d1528d726c8872c00e0345070d1", size = 164557, upload-time = "2026-04-10T13:23:35.883Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e7/3b/22b9104b0a022bd2b1627b4866876831585eda2eacb9ca1f3b4b8e847945/eclipse_zenoh-1.9.0-cp39-abi3-linux_armv6l.whl", hash = "sha256:15b6f37c407617ea4de32d32835cbcab4d1a116b892477490fc6c10a7d27c73b", size = 10664168, upload-time = "2026-04-10T13:23:15.008Z" }, + { url = "https://files.pythonhosted.org/packages/05/c5/ee0815c7ec49c5a29307cd935478305159bb3f0b2489f8c54fc6db3fdf36/eclipse_zenoh-1.9.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:6f66059b12e1ec53c70bc25192b0e74502751759064726dbb153ed6dd8f4dc8b", size = 19942168, upload-time = "2026-04-10T13:23:17.785Z" }, + { url = "https://files.pythonhosted.org/packages/7b/6a/42b83b4e8c262ebbb3bcae702394478326c807f54b3162130b0a603e1a01/eclipse_zenoh-1.9.0-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:180dd2a6da3b86b52e87f5e470a1f8a86db03c519978b22ffb1dc7c11f98ef3b", size = 10225694, upload-time = "2026-04-10T13:23:20.244Z" }, + { url = "https://files.pythonhosted.org/packages/27/57/28e66893801b63df36fea355a64b6fc22637e1148a952ee11e3039ae955e/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:949d82851bc9e3ad646fd1307ee544ed23359dcfd18d4065075fc592f6ab6fa7", size = 10517069, upload-time = "2026-04-10T13:23:23.053Z" }, + { url = "https://files.pythonhosted.org/packages/f0/2f/be614f1f7f4e046da2764cd36227d19db3655839219744ce7a12e6e2dae6/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a1fe847225cda21e3e74677cfd4ddfd2e72600d5a56968d4229d981c67f78d4", size = 11580068, upload-time = "2026-04-10T13:23:25.594Z" }, + { url = "https://files.pythonhosted.org/packages/58/1b/2a074d4f4595bd37c3d12f1b2ad49bceef5c8cd0962cbfd97d1d39f32e1f/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:43299593891cfd648bca4b2aa00f3dca916508a49a0c9e6960902e6e867b247e", size = 10537556, upload-time = "2026-04-10T13:23:28.414Z" }, + { url = "https://files.pythonhosted.org/packages/ab/33/c3116f1bf7647ee0ea8972efbe0fe5710ae75ea7226440a8fda7f04a4cbc/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8c139a43706c8ff3c94fa625008af8667687c161a8395ad1fa3faff29c16fae4", size = 10721249, upload-time = "2026-04-10T13:23:30.843Z" }, + { url = "https://files.pythonhosted.org/packages/26/16/a94c4f37e3a088faadf4b5fbc64e5f69dea1023dc7efc49b3be0e0ecc953/eclipse_zenoh-1.9.0-cp39-abi3-win_amd64.whl", hash = "sha256:5dfb352eca4585b85edbbc84c6db58906008e202823ca280496c0b867f9719f0", size = 9124510, upload-time = "2026-04-10T13:23:34.119Z" }, +] + [[package]] name = "edgetam-dimos" version = "1.0"