Skip to content

Record replay#1708

Open
Dreamsorcerer wants to merge 10 commits intodevfrom
record-replay
Open

Record replay#1708
Dreamsorcerer wants to merge 10 commits intodevfrom
record-replay

Conversation

@Dreamsorcerer
Copy link
Copy Markdown
Collaborator

Problem

We want a recorder tool with the ability to record a selection of streams for modules using the new memory2 module, and the ability to replay those files.

Closes #1575

Solution

Added a RecordReplay class to handle the main functionality, a separate recorder UI, and appropriate adjustments to link it all together.

Also made the run() entrypoint async, so it is possible to migrate gradually to asyncio (probably needs some discussion).

How to Test

Run simulation or similar, while also running dimos recorder. A selection of streams should appear in the recorder. Select some streams with space and then use r to start/stop a recording.

Replay with --replay --replay-file=....

.coverage.*

# Created from simulation
MUJOCO_LOG.TXT
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we've been missing this for a long time


def start_recording(
self,
pubsubs: Collection[LCMPubSubBase],
Copy link
Copy Markdown
Contributor

@leshy leshy Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be generic, idk what's the best interface to specify topics, but we should be able to take a subscription from any transport at this layer.

at this stage doesn't have to be a glob supporting pubsub, this is for discovery, all topics you want to record are known at this stage, so ros transport, shm should work here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, was just looking at updating this annotation. Complication is that something like AllPubSub doesn't have .start() method.

def _on_message(self, msg: bytes, topic: Topic) -> None:
stream_name = topic_to_stream_name(topic.pattern)

if self._topic_filter is not None and stream_name not in self._topic_filter:
Copy link
Copy Markdown
Contributor

@leshy leshy Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are doing subscribe_all then filtering in python which is too costly, at this stage python code already decoded LCM message and built a class for you just to throw it away, recorder will be used in actual robot modules with predefined topics, so we need efficient compute here..

this also makes me realize we might want to extend our pubsub base definition to support just watching for topics without subscribing/parsing so actual CLI recorder can be efficient, but can do it in a follow up

@leshy
Copy link
Copy Markdown
Contributor

leshy commented Apr 4, 2026

btw sharing if helpful, current module I use to hardcode implicit recording for specific robots

class Recorder(Module[RecorderConfig]):
    """Records all ``In`` ports to a memory2 SQLite database.

    Subclass with the topics you want to record::

        class MyRecorder(Recorder):
            color_image: In[Image]
            lidar: In[PointCloud2]

        blueprint.add(MyRecorder, db_path="session.db")
    """

    default_config = RecorderConfig

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._store: SqliteStore | None = None

    @rpc
    def start(self) -> None:
        super().start()

        self._store = self.register_disposable(SqliteStore(path=self.config.db_path))
        self._store.start()

        if not self.inputs:
            logger.warning("Recorder has no In ports — nothing to record, subclass the Recorder")
            return

        for name, port in self.inputs.items():
            stream: Stream[Any] = self._store.stream(name, port.type)
            unsub = port.subscribe(lambda msg, s=stream: s.append(msg))
            self.register_disposable(Disposable(unsub))
            logger.info("Recording %s (%s)", name, port.type.__name__)

    @rpc
    def stop(self) -> None:
        super().stop()

@dimensionalOS dimensionalOS deleted a comment from greptile-apps bot Apr 10, 2026
@leshy
Copy link
Copy Markdown
Contributor

leshy commented Apr 10, 2026

Can we get docs on how this is used? how do I test full functionality? I see recording to record can be set via CLI?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants