Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
7a23d45
vis kickstart
leshy Mar 21, 2026
fdf9bd4
fix(memory2): address PR review — narrow exception catch, fix test bugs
leshy Mar 21, 2026
b0ad0bc
fix(typing): add @overload to embed/embed_text, fix conftest return t…
leshy Mar 21, 2026
607ee10
fix(typing): add @overload to remaining EmbeddingModel subclasses
leshy Mar 21, 2026
3d7becc
batch transform
leshy Mar 21, 2026
4ffc0ff
Merge branch 'dev' into feat/memory2-vis-kickstart
leshy Mar 23, 2026
c936444
checkpoint
leshy Mar 24, 2026
329f263
Merge branch 'dev' into feat/memory2-vis-kickstart
leshy Mar 24, 2026
423128f
fix(memory2): address PR review — delete empty file, add SQL injectio…
leshy Mar 24, 2026
da56191
Merge branch 'feat/memory2-vis-kickstart' into feat/memory2-svgvis
leshy Mar 25, 2026
2057960
transform module
leshy Mar 25, 2026
bac18cc
memory module experiment
leshy Mar 25, 2026
fb0b970
cleanup
leshy Mar 25, 2026
9ca9019
better API
leshy Mar 25, 2026
a71f914
cleanup
leshy Mar 25, 2026
ac838a2
small cleanup
leshy Mar 25, 2026
5e91c8c
memory2 module refactor
leshy Mar 25, 2026
b755dee
nullstore
leshy Mar 25, 2026
39206ab
tests fix
leshy Mar 25, 2026
8f5577b
null store
leshy Mar 26, 2026
4dd60c0
typing cleanup
leshy Mar 26, 2026
fb962be
Merge remote-tracking branch 'origin/dev' into feat/memory2-svgvis
leshy Mar 26, 2026
ae6de37
cleanup
leshy Mar 26, 2026
05fd764
mem module
leshy Mar 26, 2026
8e1e7f1
better store cleanup
leshy Mar 26, 2026
d04ade4
further shutdown cleanup
leshy Mar 26, 2026
338713e
correct live stream shutdown
leshy Mar 26, 2026
70980ff
null store tests extracted
leshy Mar 26, 2026
7a538d3
tests fix
leshy Mar 26, 2026
840f874
observation projection
leshy Mar 26, 2026
71fc3a9
Fix VoxelMap rename to VoxelMapTransformer and typo in VoxelGridMappe…
leshy Mar 26, 2026
3a19b4e
typo
leshy Mar 26, 2026
abb2fbf
Merge branch 'dev' into feat/memory2-svgvis
leshy Mar 26, 2026
331d821
Use CompositeResource.register_disposable() instead of direct _dispos…
leshy Mar 27, 2026
b84f96c
agent test writing guidelines
leshy Mar 27, 2026
b5c2ae8
test_null: test through NullStore API instead of ListObservationStore…
leshy Mar 27, 2026
a3ff4ab
Address PR #1682 review comments
leshy Mar 27, 2026
207e763
Rename _xf to _transform in Stream for readability
leshy Mar 27, 2026
b56517d
Fix flaky LCM test timeout and drone mock disposable error
leshy Mar 27, 2026
503e992
disposing review
leshy Mar 27, 2026
49c9b64
Add stream-level custody tests for Stream → Backend ownership
leshy Mar 27, 2026
51745e9
test cleanup
leshy Mar 27, 2026
1b5f3d4
Add agent code style guide with no-banner rule
leshy Mar 27, 2026
f135766
unbound stream tests belong in stream
leshy Mar 27, 2026
889d153
comemnts cleanup
leshy Mar 27, 2026
9aabcd3
small cleanup
leshy Mar 27, 2026
5fe848f
Wrap subscribe() callables in Disposable for register_disposable()
leshy Mar 27, 2026
07b6609
Merge branch 'dev' into feat/memory2-svgvis
leshy Mar 27, 2026
469da56
temporary test supression
leshy Mar 27, 2026
88aa58a
Merge branch 'feat/memory2-svgvis' of github.com:dimensionalOS/dimos …
leshy Mar 27, 2026
b0f04c9
obs.start bugfix
leshy Mar 27, 2026
ecca867
Merge remote-tracking branch 'origin/dev' into feat/memory2-svgvis
leshy Apr 5, 2026
6b5d649
Fix VoxelGrid.get_global_pointcloud: use _voxel_size not config.voxel…
leshy Apr 5, 2026
9c30023
Merge branch 'dev' into feat/memory2-svgvis
leshy Apr 5, 2026
345fe27
Merge branch 'dev' into feat/memory2-svgvis
leshy Apr 9, 2026
e9fead0
Merge branch 'feat/memory2-svgvis' of github.com:dimensionalOS/dimos …
leshy Apr 9, 2026
fae3715
Merge branch 'dev' into feat/memory2-svgvis
leshy Apr 9, 2026
5e41489
fix: align with dev's Configurable API and remove default_config
leshy Apr 9, 2026
f6271ba
removing accidentally added files
leshy Apr 9, 2026
147ca5e
CompositeResource owns the disposables
leshy Apr 9, 2026
c22cc01
fix: replace _disposables.add() with register_disposable()
leshy Apr 9, 2026
cd3a2db
Merge remote-tracking branch 'origin/dev' into feat/memory2-svgvis
leshy Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/.lfs/go2_bigoffice.db.tar.gz
Git LFS file not shown
4 changes: 2 additions & 2 deletions dimos/agents/agent_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.agent.subscribe(self._on_agent_message)))
self._disposables.add(Disposable(self.agent_idle.subscribe(self._on_agent_idle)))
self.register_disposable(Disposable(self.agent.subscribe(self._on_agent_message)))
self.register_disposable(Disposable(self.agent_idle.subscribe(self._on_agent_idle)))
# Signal that subscription is ready
self._subscription_ready.set()

Expand Down
2 changes: 1 addition & 1 deletion dimos/agents/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def start(self) -> None:
def _on_human_input(string: str) -> None:
self._message_queue.put(HumanMessage(content=string))

self._disposables.add(Disposable(self.human_input.subscribe(_on_human_input)))
self.register_disposable(Disposable(self.human_input.subscribe(_on_human_input)))

@rpc
def on_system_modules(self, _modules: list[RPCClient]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion dimos/agents/skills/demo_robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DemoRobot(Module):

def start(self) -> None:
super().start()
self._disposables.add(interval(1.0).subscribe(lambda _: self._publish_gps_location()))
self.register_disposable(interval(1.0).subscribe(lambda _: self._publish_gps_location()))

def stop(self) -> None:
super().stop()
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/google_maps_skill_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import json
from typing import Any

from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
Expand Down Expand Up @@ -49,7 +51,7 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))

@rpc
def stop(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/gps_nav_skill.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import json

from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
Expand All @@ -38,7 +40,7 @@ class GpsNavSkillContainer(Module):
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))

@rpc
def stop(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.color_image.subscribe(self._on_color_image)))
self._disposables.add(Disposable(self.odom.subscribe(self._on_odom)))
self.register_disposable(Disposable(self.color_image.subscribe(self._on_color_image)))
self.register_disposable(Disposable(self.odom.subscribe(self._on_odom)))
self._skill_started = True

@rpc
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/osm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.


from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.module import Module
from dimos.core.stream import In
Expand All @@ -39,7 +41,7 @@ def __init__(self) -> None:
def start(self) -> None:
super().start()
if hasattr(self.gps_location, "subscribe"):
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))
else:
logger.warning(
"OsmSkill: gps_location stream does not support direct subscribe (RemoteIn)"
Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/skills/person_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.color_image.subscribe(self._on_color_image)))
self.register_disposable(Disposable(self.color_image.subscribe(self._on_color_image)))
if self.config.use_3d_navigation:
self._disposables.add(Disposable(self.global_map.subscribe(self._on_pointcloud)))
self.register_disposable(Disposable(self.global_map.subscribe(self._on_pointcloud)))

@rpc
def stop(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions dimos/agents/vlm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from reactivex.disposable import Disposable

from dimos.agents.system_prompt import SYSTEM_PROMPT
from dimos.core.core import rpc
Expand Down Expand Up @@ -60,8 +61,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type]
self._disposables.add(self.query_stream.subscribe(self._on_query)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.color_image.subscribe(self._on_image)))
self.register_disposable(Disposable(self.query_stream.subscribe(self._on_query)))

@rpc
def stop(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions dimos/agents/vlm_stream_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time

from langchain_core.messages import AIMessage, HumanMessage
from reactivex.disposable import Disposable

from dimos.agents.vlm_agent_spec import VLMAgentSpec
from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT
Expand Down Expand Up @@ -62,8 +63,8 @@ def __init__(
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type]
self._disposables.add(self.answer_stream.subscribe(self._on_answer)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.color_image.subscribe(self._on_image)))
self.register_disposable(Disposable(self.answer_stream.subscribe(self._on_answer)))
self._worker = threading.Thread(target=self._run_queries, daemon=True)
self._worker.start()

Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/web_human_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ def start(self) -> None:
# Subscribe to both text input sources
# 1. Direct text from web interface
unsub = self._web_interface.query_stream.subscribe(self._human_transport.publish)
self._disposables.add(unsub)
self.register_disposable(unsub)

# 2. Transcribed text from STT
unsub = stt_node.emit_text().subscribe(self._human_transport.publish)
self._disposables.add(unsub)
self.register_disposable(unsub)

self._thread = Thread(target=self._web_interface.run, daemon=True)
self._thread.start()
Expand Down
2 changes: 1 addition & 1 deletion dimos/core/coordination/_test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class AliceModule(Module):
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.greetings.subscribe(self._on_greetings)))
self.register_disposable(Disposable(self.greetings.subscribe(self._on_greetings)))

@rpc
def stop(self) -> None:
Expand Down
24 changes: 11 additions & 13 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
)

from pydantic import Field
from reactivex.disposable import CompositeDisposable

from dimos.core.core import T, rpc
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.introspection.module.info import extract_module_info
from dimos.core.introspection.module.render import render_module_io
from dimos.core.resource import Resource
from dimos.core.resource import CompositeResource
from dimos.core.rpc_client import RpcCall
from dimos.core.stream import In, Out, RemoteOut, Transport
from dimos.protocol.rpc.pubsubrpc import LCMRPC
from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, RPCSpec
Expand Down Expand Up @@ -97,8 +97,9 @@ class _BlueprintPartial(Protocol):
def __call__(self, **kwargs: Any) -> "Blueprint": ...


class ModuleBase(Configurable, Resource):
class ModuleBase(Configurable, CompositeResource):
config: ModuleConfig

# Deployment target. Worker managers declare which deployment type they
# handle; the coordinator routes modules accordingly.
deployment: ClassVar[Deployment] = "python"
Expand All @@ -107,7 +108,7 @@ class ModuleBase(Configurable, Resource):
_tf: TFSpec | None = None
_loop: asyncio.AbstractEventLoop | None = None
_loop_thread: threading.Thread | None
_disposables: CompositeDisposable
_bound_rpc_calls: dict[str, RpcCall] = {}
_module_closed: bool = False
_module_closed_lock: threading.Lock
_loop_thread_timeout: float = 2.0
Expand All @@ -116,7 +117,6 @@ def __init__(self, config_args: dict[str, Any]) -> None:
super().__init__(**config_args)
self._module_closed_lock = threading.Lock()
self._loop, self._loop_thread = get_loop()
self._disposables = CompositeDisposable()
try:
self.rpc = self.config.rpc_transport( # type: ignore[call-arg]
rpc_timeouts=self.config.rpc_timeouts,
Expand Down Expand Up @@ -149,6 +149,7 @@ def start(self) -> None:

@rpc
def stop(self) -> None:
super().stop()
self._close_module()

def _close_module(self) -> None:
Expand All @@ -175,14 +176,12 @@ def _close_module(self) -> None:
if hasattr(self, "_tf") and self._tf is not None:
self._tf.stop()
self._tf = None
if hasattr(self, "_disposables"):
self._disposables.dispose()

# Break the In/Out -> owner -> self reference cycle so the instance
# can be freed by refcount instead of waiting for GC.
for attr in list(vars(self).values()):
if isinstance(attr, (In, Out)):
attr.owner = None
# Stop transports and break the In/Out -> owner -> self reference
# cycle so the instance can be freed by refcount instead of waiting for GC.
for attr in [*self.inputs.values(), *self.outputs.values()]:
attr.stop()
attr.owner = None

def _close_rpc(self) -> None:
if self.rpc:
Expand All @@ -205,7 +204,6 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
"""Restore object from pickled state."""
self.__dict__.update(state)
# Reinitialize runtime attributes
self._disposables = CompositeDisposable()
self._module_closed_lock = threading.Lock()
self._loop = None
self._loop_thread = None
Expand Down
25 changes: 13 additions & 12 deletions dimos/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from abc import abstractmethod
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar

if sys.version_info >= (3, 11):
from typing import Self
Expand All @@ -29,6 +29,8 @@
from reactivex.abc import DisposableBase
from reactivex.disposable import CompositeDisposable

D = TypeVar("D", bound=DisposableBase)


class Resource(DisposableBase):
@abstractmethod
Expand Down Expand Up @@ -75,18 +77,17 @@ def __exit__(
class CompositeResource(Resource):
"""Resource that owns child disposables, disposed on stop()."""

_disposables: CompositeDisposable

def __init__(self) -> None:
self._disposables = CompositeDisposable()
_disposables: CompositeDisposable | None = None

def register_disposables(self, *disposables: DisposableBase) -> None:
"""Register child disposables to be disposed when this resource stops."""
for d in disposables:
self._disposables.add(d)
def register_disposable(self, disposable: D) -> D:
"""Register a child disposable to be disposed when this resource stops."""
if self._disposables is None:
Comment thread
leshy marked this conversation as resolved.
self._disposables = CompositeDisposable()
self._disposables.add(disposable)
return disposable
Comment thread
leshy marked this conversation as resolved.

def start(self) -> None:
pass
def start(self) -> None: ...

def stop(self) -> None:
self._disposables.dispose()
if self._disposables is not None:
self._disposables.dispose()
4 changes: 4 additions & 0 deletions dimos/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def __str__(self) -> str:
+ ("" if not self._transport else " via " + str(self._transport))
)

def stop(self) -> None:
if self._transport is not None:
self._transport.stop()


class Out(Stream[T], ObservableMixin[T]):
_transport: Transport # type: ignore[type-arg]
Expand Down
4 changes: 2 additions & 2 deletions dimos/core/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _odom(msg) -> None:
self.mov.publish(msg.position)

unsub = self.odometry.subscribe(_odom)
self._disposables.add(Disposable(unsub))
self.register_disposable(Disposable(unsub))

def _lidar(msg) -> None:
self.lidar_msg_count += 1
Expand All @@ -57,7 +57,7 @@ def _lidar(msg) -> None:
print("RCV: unknown time", msg)

unsub = self.lidar.subscribe(_lidar)
self._disposables.add(Disposable(unsub))
self.register_disposable(Disposable(unsub))


def test_classmethods() -> None:
Expand Down
8 changes: 4 additions & 4 deletions dimos/experimental/security_demo/security_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.odom.subscribe(self._on_odom)))
self._disposables.add(
self.register_disposable(Disposable(self.odom.subscribe(self._on_odom)))
self.register_disposable(
Disposable(self.global_costmap.subscribe(self._router.handle_occupancy_grid))
)
self._disposables.add(Disposable(self.goal_reached.subscribe(self._on_goal_reached)))
self._disposables.add(Disposable(self.color_image.subscribe(self._on_color_image)))
self.register_disposable(Disposable(self.goal_reached.subscribe(self._on_goal_reached)))
self.register_disposable(Disposable(self.color_image.subscribe(self._on_color_image)))

self._depth_estimator.start()

Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def on_image(image: Image) -> None:
self.color_image.publish(image)
self._latest_image = image

self._disposables.add(
self.register_disposable(
stream.subscribe(on_image),
)

self._disposables.add(
self.register_disposable(
rx.interval(1.0).subscribe(lambda _: self.publish_metadata()),
)

Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/realsense/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ def start(self) -> None:

if self.config.enable_pointcloud and self.config.enable_depth:
interval_sec = 1.0 / self.config.pointcloud_fps
self._disposables.add(
self.register_disposable(
backpressure(rx.interval(interval_sec)).subscribe(
on_next=lambda _: self._generate_pointcloud(),
on_error=lambda e: print(f"Pointcloud error: {e}"),
)
)

interval_sec = 1.0 / self.config.camera_info_fps
self._disposables.add(
self.register_disposable(
rx.interval(interval_sec).subscribe(
on_next=lambda _: self._publish_camera_info(),
on_error=lambda e: print(f"CameraInfo error: {e}"),
Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/zed/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def start(self) -> None:
self._enable_tracking()

interval_sec = 1.0 / self.config.camera_info_fps
self._disposables.add(
self.register_disposable(
rx.interval(interval_sec).subscribe(
on_next=lambda _: self._publish_camera_info(),
on_error=lambda e: print(f"CameraInfo error: {e}"),
Expand All @@ -193,7 +193,7 @@ def start(self) -> None:

if self.config.enable_pointcloud and self.config.enable_depth:
interval_sec = 1.0 / self.config.pointcloud_fps
self._disposables.add(
self.register_disposable(
backpressure(rx.interval(interval_sec)).subscribe(
on_next=lambda _: self._generate_pointcloud(),
on_error=lambda e: print(f"Pointcloud error: {e}"),
Expand Down
Loading
Loading