Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
61fbbb1
refactor: scaffold conditional logic for Zenoh transport
vrinek Apr 12, 2026
2c95ebc
feat: implement ZenohService with singleton session management
vrinek Apr 12, 2026
7442ca9
feat: implement ZenohPubSubBase with raw bytes pub/sub
vrinek Apr 12, 2026
68d8ef5
feat: add Zenoh and PickleZenoh to spec conformance tests
vrinek Apr 12, 2026
5f8428e
feat: complete ZenohTransport and pZenohTransport wrappers
vrinek Apr 12, 2026
4477c2e
feat: add Zenoh to transport benchmark suite
vrinek Apr 12, 2026
d7e8d36
docs: document error handling contract in ZenohPubSubBase.publish()
vrinek Apr 12, 2026
6d72cb5
fix: race condition in unsubscribe/stop and payload error handling
vrinek Apr 12, 2026
1fa6021
refactor: replace try/except with conditional in unsubscribe
vrinek Apr 13, 2026
fbb4bb4
fix: Rerun bridge visualization over Zenoh transport
vrinek Apr 13, 2026
3efda9c
docs: remove phase reference from test docstring
vrinek Apr 13, 2026
1d5805e
refactor: rename test module streams for clarity
vrinek Apr 13, 2026
efedbd4
style: fix import sorting and formatting (ruff)
vrinek Apr 13, 2026
b6fa65d
refactor: extract wait_for_subscribers/wait_for_delivery test helpers
vrinek Apr 13, 2026
7822ad0
fix: race condition in test_multiple_subscribers
vrinek Apr 13, 2026
be35828
refactor: remove unused Config.pubsubs field and dual topic state
vrinek Apr 13, 2026
b4493da
test: add unit tests for _topic_to_key_expr and _key_expr_to_topic
vrinek Apr 13, 2026
06d4ea0
fix: restore Config.pubsubs field for backwards compatibility
vrinek Apr 13, 2026
c7f5ca4
chore: downgrade Zenoh session log from info to debug
vrinek Apr 13, 2026
b2ffbc5
fix: bridge listens on both Zenoh and LCM for TF data
vrinek Apr 13, 2026
a523722
fix: replace sleep-based test helpers with retry loop
vrinek Apr 14, 2026
c806297
feat: initialize Zenoh logging from RUST_LOG env var
vrinek Apr 14, 2026
fef846e
fix: drop dimos[dev] from zenoh optional dependency group
vrinek Apr 14, 2026
c8591ab
chore: apply ruff format/check after rebase
bogwi Apr 22, 2026
2f3a1bf
fix: align ZenohService with Service config pattern; fix ZENOH guard …
bogwi Apr 22, 2026
694766e
fix(rerun): fail fast in _default_pubsubs when zenoh is requested but…
bogwi Apr 22, 2026
4972589
fix(zenoh): resolve mypy errors in zenoh pubsub, transport, and bench…
bogwi Apr 23, 2026
0216d83
fix(mac): default Zenoh transport and document replay workflow
bogwi Apr 23, 2026
e82b5dc
fix zenoh test collection and rerun pubsub overrides
bogwi Apr 23, 2026
89702c1
fix(zenoh): skip zenohservice tests when dependency is missing
bogwi Apr 23, 2026
284bb56
fix(zenoh): validate global transport selection
bogwi Apr 23, 2026
93cfd83
Merge branch 'dev' into feat/integrate-zenoh
leshy Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions dimos/core/coordination/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import ModuleBase, ModuleSpec
from dimos.core.resource import Resource
from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport
from dimos.core.transport import ZENOH_AVAILABLE, LCMTransport, PubSubTransport, pLCMTransport
from dimos.spec.utils import spec_annotation_compliance, spec_structural_compliance
from dimos.utils.generic import short_id
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -542,7 +542,23 @@ def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> Pu

use_pickled = getattr(stream_type, "lcm_encode", None) is None
topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}"
transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type)

if global_config.transport == "zenoh":
if not ZENOH_AVAILABLE:
raise RuntimeError(
"transport='zenoh' but eclipse-zenoh is not installed. "
"Install with: uv sync --extra zenoh"
)
from dimos.core.transport import ZenohTransport, pZenohTransport

zenoh_topic = f"dimos{topic}"
transport = (
pZenohTransport(zenoh_topic)
if use_pickled
else ZenohTransport(zenoh_topic, stream_type)
)
else:
transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type)

return transport

Expand Down Expand Up @@ -612,7 +628,8 @@ def _run_configurators(blueprint: Blueprint) -> None:
from dimos.protocol.service.system_configurator.base import configure_system
from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators

configurators = [*lcm_configurators(), *blueprint.configurator_checks]
lcm_checks = lcm_configurators() if global_config.transport == "lcm" else []
configurators = [*lcm_checks, *blueprint.configurator_checks]

try:
configure_system(configurators)
Expand Down
12 changes: 12 additions & 0 deletions dimos/core/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import platform
import re
from typing import Literal, TypeAlias

from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from dimos.core.transport import ZENOH_AVAILABLE
from dimos.models.vl.types import VlModelName

ViewerBackend: TypeAlias = Literal["rerun", "rerun-web", "rerun-connect", "foxglove", "none"]
TransportBackend: TypeAlias = Literal["lcm", "zenoh"]


def _get_all_numbers(s: str) -> list[float]:
return [float(x) for x in re.findall(r"-?\d+\.?\d*", s)]


def _default_transport() -> TransportBackend:
if platform.system() == "Darwin" and ZENOH_AVAILABLE:
return "zenoh"
return "lcm"


class GlobalConfig(BaseSettings):
robot_ip: str | None = None
robot_ips: str | None = None
Expand All @@ -52,6 +62,7 @@ class GlobalConfig(BaseSettings):
nerf_speed: float = 1.0
planner_robot_speed: float | None = None
mcp_port: int = 9990
transport: TransportBackend = Field(default_factory=_default_transport)
dtop: bool = False
obstacle_avoidance: bool = True
detection_model: VlModelName = "moondream"
Expand All @@ -61,6 +72,7 @@ class GlobalConfig(BaseSettings):
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
validate_assignment=True,
)

def update(self, **kwargs: object) -> None:
Expand Down
43 changes: 43 additions & 0 deletions dimos/core/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared test helpers that must remain import-safe in minimal environments."""

from __future__ import annotations

from collections.abc import Callable
import threading


def retry_until(
event: threading.Event,
action: Callable[[], None],
timeout: float = 2.0,
interval: float = 0.01,
) -> None:
"""Retry an action until an Event fires.

Useful for async test paths where the producer may need to retry briefly
before the subscriber is fully ready.
"""
deadline = threading.Event()
timer = threading.Timer(timeout, deadline.set)
timer.start()
try:
while not event.is_set() and not deadline.is_set():
action()
event.wait(interval)
finally:
timer.cancel()
assert event.is_set(), f"Timed out after {timeout}s waiting for event"
Loading