Sensor Storage and Replay
Record sensor streams to disk and replay them with original timing. Useful for testing, debugging, and creating reproducible datasets.Quick Start
Recording
skip
Replaying
skip
TimedSensorStorage
Stores sensor data with timestamps as pickle files. Each frame is saved as000.pickle, 001.pickle, etc.
skip
.raw_msg attribute, that’s pickled instead of the full object. You can customize with the autocast parameter:
skip
TimedSensorReplay
Replays stored sensor data with timestamp-aware iteration and seeking.Basic Iteration
skip
Realtime Playback
skip
Seeking and Slicing
skip
Finding Specific Frames
skip
Observable Stream
The.stream() method returns an Observable that emits frames with original timing:
skip
Usage: Stub Connections for Testing
A common pattern is creating replay-based connection stubs for testing without hardware. Fromrobot/unitree/go2/connection.py:
This is a bit primitive. We’d like to write a higher-order API for recording full module I/O for any module, but this is a work in progress at the moment.
skip
skip
Data Format
Each pickle file contains a tuple(timestamp, data):
- timestamp: Unix timestamp (float) when the frame was captured
- data: The sensor data (or result of
autocastif provided)
000.pickle, 001.pickle, etc.
Recordings are stored in the data/ directory. See Data Loading for how data storage works, including Git LFS handling for large datasets.
API Reference
TimedSensorStorage
| Method | Description |
|---|---|
save_one(frame) | Save a single frame, returns frame count |
save(*frames) | Save multiple frames |
save_stream(observable) | Pipe an observable through storage |
consume_stream(observable) | Subscribe and save without returning |
TimedSensorReplay
| Method | Description |
|---|---|
iterate(loop=False) | Iterate frames (no timing) |
iterate_ts(seek, duration, loop) | Iterate with absolute timestamps |
iterate_duration(...) | Iterate with relative timestamps |
iterate_realtime(speed, ...) | Iterate with blocking to match timing |
stream(speed, seek, duration, loop) | Observable with original timing |
find_closest(timestamp, tolerance) | Find frame by absolute timestamp |
find_closest_seek(relative_seconds, tolerance) | Find frame by relative time |
first() | Get first frame |
first_timestamp() | Get first timestamp |
load(name) | Load specific frame by name/index |
