openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
11 KiB

#include "tools/replay/replay.h"
#include <capnp/dynamic.h>
#include <csignal>
#include "cereal/services.h"
#include "common/params.h"
#include "tools/replay/util.h"
static void interrupt_sleep_handler(int signal) {}
// Helper function to notify events with safety checks
template <typename Callback, typename... Args>
void notifyEvent(Callback &callback, Args &&...args) {
if (callback) callback(std::forward<Args>(args)...);
}
Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block,
SubMaster *sm, uint32_t flags, const std::string &data_dir)
: sm_(sm), flags_(flags), seg_mgr_(std::make_unique<SegmentManager>(route, flags, data_dir)) {
std::signal(SIGUSR1, interrupt_sleep_handler);
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block.insert(block.end(), {"uiDebug", "userFlag"});
}
setupServices(allow, block);
setupSegmentManager(!allow.empty() || !block.empty());
}
void Replay::setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block) {
auto event_schema = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_schema.getUnionFields().size(), nullptr);
std::vector<const char *> active_services;
for (const auto &[name, _] : services) {
bool is_blocked = std::find(block.begin(), block.end(), name) != block.end();
bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end();
if (is_allowed && !is_blocked) {
uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue();
sockets_[which] = name.c_str();
active_services.push_back(name.c_str());
}
}
rInfo("active services: %s", join(active_services, ", ").c_str());
if (!sm_) {
pm_ = std::make_unique<PubMaster>(active_services);
}
}
void Replay::setupSegmentManager(bool has_filters) {
seg_mgr_->setCallback([this]() { handleSegmentMerge(); });
if (has_filters) {
std::vector<bool> filters(sockets_.size(), false);
for (size_t i = 0; i < sockets_.size(); ++i) {
filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
}
seg_mgr_->setFilters(filters);
}
qt replay (#20602) * initial commit, works * remove nui * working again * visionipc * cleanup * cleanup * moving VisionIpcServer to Unlogger class * works * tab cleanup * headless mode * headless mode works * working headless mode * gitignore update * small unlogger refactor * refactor param in UIState * works, very slow, hacks * cleanup * works * cleanup * cleanup * unused * works for whole route * nicer * a little nicer * different threshold * maintains 1 segment window * works with public api * comments * networkTimer works * cleanup * unified HttpRequest * tabs * tabs * comments' * gitignore * gitignore * only on PC * same line else * no changes in home.cc * scons * update scons * works * revert mainc.c * revert home * else * just api + problem with api send * works * include cleanup * general json fail * whitespace * remove active * adding request repeater * removing comments * tabs * update comment * cereal * fix * trailing new lines * grammar * if whitespace * indentation * Update selfdrive/ui/SConscript Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/ui/qt/request_repeater.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * works * sort by dir * no blockSignal * replay is now QOBject * cant take const char * rename inner it * get width and height from frame readeR * resolve TODO * seek in next pr * spaces * ui stuff * fix CI * remove comments * no repalce * trim segment fix * remove seek from stream * no cache key * final changes' * fix Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 19d962cdf37b80523deba6518057f2e860f65fee
4 years ago
}
Replay::~Replay() {
seg_mgr_.reset();
if (stream_thread_.joinable()) {
rInfo("shutdown: in progress...");
interruptStream([this]() {
exit_ = true;
return false;
});
stream_thread_.join();
rInfo("shutdown: done");
}
camera_server_.reset();
qt replay (#20602) * initial commit, works * remove nui * working again * visionipc * cleanup * cleanup * moving VisionIpcServer to Unlogger class * works * tab cleanup * headless mode * headless mode works * working headless mode * gitignore update * small unlogger refactor * refactor param in UIState * works, very slow, hacks * cleanup * works * cleanup * cleanup * unused * works for whole route * nicer * a little nicer * different threshold * maintains 1 segment window * works with public api * comments * networkTimer works * cleanup * unified HttpRequest * tabs * tabs * comments' * gitignore * gitignore * only on PC * same line else * no changes in home.cc * scons * update scons * works * revert mainc.c * revert home * else * just api + problem with api send * works * include cleanup * general json fail * whitespace * remove active * adding request repeater * removing comments * tabs * update comment * cereal * fix * trailing new lines * grammar * if whitespace * indentation * Update selfdrive/ui/SConscript Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/ui/qt/request_repeater.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * works * sort by dir * no blockSignal * replay is now QOBject * cant take const char * rename inner it * get width and height from frame readeR * resolve TODO * seek in next pr * spaces * ui stuff * fix CI * remove comments * no repalce * trim segment fix * remove seek from stream * no cache key * final changes' * fix Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 19d962cdf37b80523deba6518057f2e860f65fee
4 years ago
}
bool Replay::load() {
rInfo("loading route %s", seg_mgr_->route_.name().c_str());
if (!seg_mgr_->load()) return false;
qt replay (#20602) * initial commit, works * remove nui * working again * visionipc * cleanup * cleanup * moving VisionIpcServer to Unlogger class * works * tab cleanup * headless mode * headless mode works * working headless mode * gitignore update * small unlogger refactor * refactor param in UIState * works, very slow, hacks * cleanup * works * cleanup * cleanup * unused * works for whole route * nicer * a little nicer * different threshold * maintains 1 segment window * works with public api * comments * networkTimer works * cleanup * unified HttpRequest * tabs * tabs * comments' * gitignore * gitignore * only on PC * same line else * no changes in home.cc * scons * update scons * works * revert mainc.c * revert home * else * just api + problem with api send * works * include cleanup * general json fail * whitespace * remove active * adding request repeater * removing comments * tabs * update comment * cereal * fix * trailing new lines * grammar * if whitespace * indentation * Update selfdrive/ui/SConscript Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/ui/qt/request_repeater.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * works * sort by dir * no blockSignal * replay is now QOBject * cant take const char * rename inner it * get width and height from frame readeR * resolve TODO * seek in next pr * spaces * ui stuff * fix CI * remove comments * no repalce * trim segment fix * remove seek from stream * no cache key * final changes' * fix Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 19d962cdf37b80523deba6518057f2e860f65fee
4 years ago
min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60;
max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60;
return true;
}
void Replay::interruptStream(const std::function<bool()> &update_fn) {
if (stream_thread_.joinable() && stream_thread_id) {
pthread_kill(stream_thread_id, SIGUSR1); // Interrupt sleep in stream thread
}
{
interrupt_requested_ = true;
std::unique_lock lock(stream_lock_);
events_ready_ = update_fn();
interrupt_requested_ = user_paused_;
}
stream_cv_.notify_one();
}
void Replay::seekTo(double seconds, bool relative) {
double target_time = relative ? seconds + currentSeconds() : seconds;
target_time = std::max(0.0, target_time);
int target_segment = target_time / 60;
if (!seg_mgr_->hasSegment(target_segment)) {
rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment);
return;
}
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
notifyEvent(onSeeking, target_time);
interruptStream([&]() {
current_segment_.store(target_segment);
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
cur_which_ = cereal::Event::Which::INIT_DATA;
seeking_to_.store(target_time, std::memory_order_relaxed);
return false;
});
seg_mgr_->setCurrentSegment(target_segment);
checkSeekProgress();
}
void Replay::checkSeekProgress() {
if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_.load())) return;
double seek_to = seeking_to_.exchange(-1.0, std::memory_order_acquire);
if (seek_to >= 0 && onSeekedTo) {
onSeekedTo(seek_to);
}
// Resume the interrupted stream
interruptStream([]() { return true; });
}
void Replay::seekToFlag(FindFlag flag) {
if (auto next = timeline_.find(currentSeconds(), flag)) {
seekTo(*next - 2, false); // seek to 2 seconds before next
}
}
void Replay::pause(bool pause) {
if (user_paused_ != pause) {
interruptStream([=]() {
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
user_paused_ = pause;
return !pause;
});
}
}
void Replay::handleSegmentMerge() {
if (exit_) return;
auto event_data = seg_mgr_->getEventData();
if (!stream_thread_.joinable() && !event_data->segments.empty()) {
startStream(event_data->segments.begin()->second);
}
notifyEvent(onSegmentsMerged);
// Interrupt the stream to handle segment merge
interruptStream([]() { return false; });
checkSeekProgress();
}
void Replay::startStream(const std::shared_ptr<Segment> segment) {
const auto &events = segment->log->events;
route_start_ts_ = events.front().mono_time;
cur_mono_time_ += route_start_ts_ - 1;
// get datetime from INIT_DATA, fallback to datetime in the route name
route_date_time_ = route().datetime();
auto it = std::find_if(events.cbegin(), events.cend(),
[](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
if (it != events.cend()) {
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) {
route_date_time_ = wall_time / 1e6;
}
}
// write CarParams
it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) {
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
car_fingerprint_ = event.getCarParams().getCarFingerprint();
capnp::MallocMessageBuilder builder;
builder.setRoot(event.getCarParams());
auto words = capnp::messageToFlatArray(builder);
auto bytes = words.asBytes();
Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
Params().put("CarParamsPersistent", (const char *)bytes.begin(), bytes.size());
} else {
rWarning("failed to read CarParams from current segment");
}
// start camera server
if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
std::pair<int, int> camera_size[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) {
if (auto &fr = segment->frames[type]) {
camera_size[type] = {fr->width, fr->height};
}
}
camera_server_ = std::make_unique<CameraServer>(camera_size);
}
timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) { notifyEvent(onQLogLoaded, log); });
stream_thread_ = std::thread(&Replay::streamThread, this);
}
void Replay::publishMessage(const Event *e) {
if (event_filter_ && event_filter_(e)) return;
if (!sm_) {
auto bytes = e->data.asBytes();
int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
sockets_[e->which] = nullptr;
}
} else {
capnp::FlatArrayMessageReader reader(e->data);
auto event = reader.getRoot<cereal::Event>();
sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
}
}
void Replay::publishFrame(const Event *e) {
CameraType cam;
switch (e->which) {
case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
default: return; // Invalid event type
}
if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
return; // Camera isdisabled
auto seg_it = event_data_->segments.find(e->eidx_segnum);
if (seg_it != event_data_->segments.end()) {
if (auto &frame = seg_it->second->frames[cam]; frame) {
camera_server_->pushFrame(cam, frame.get(), e);
}
}
}
void Replay::streamThread() {
stream_thread_id = pthread_self();
std::unique_lock lk(stream_lock_);
while (true) {
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
if (exit_) break;
event_data_ = seg_mgr_->getEventData();
const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {}));
if (first == events.cend()) {
rInfo("waiting for events...");
events_ready_ = false;
continue;
}
auto it = publishEvents(first, events.cend());
// Ensure frames are sent before unlocking to prevent race conditions
if (camera_server_) {
camera_server_->waitForSent();
}
if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
int last_segment = seg_mgr_->route_.segments().rbegin()->first;
if (event_data_->isSegmentLoaded(last_segment)) {
rInfo("reaches the end of route, restart from beginning");
stream_lock_.unlock();
seekTo(minSeconds(), false);
stream_lock_.lock();
}
}
}
qt replay (#20602) * initial commit, works * remove nui * working again * visionipc * cleanup * cleanup * moving VisionIpcServer to Unlogger class * works * tab cleanup * headless mode * headless mode works * working headless mode * gitignore update * small unlogger refactor * refactor param in UIState * works, very slow, hacks * cleanup * works * cleanup * cleanup * unused * works for whole route * nicer * a little nicer * different threshold * maintains 1 segment window * works with public api * comments * networkTimer works * cleanup * unified HttpRequest * tabs * tabs * comments' * gitignore * gitignore * only on PC * same line else * no changes in home.cc * scons * update scons * works * revert mainc.c * revert home * else * just api + problem with api send * works * include cleanup * general json fail * whitespace * remove active * adding request repeater * removing comments * tabs * update comment * cereal * fix * trailing new lines * grammar * if whitespace * indentation * Update selfdrive/ui/SConscript Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/ui/qt/request_repeater.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * works * sort by dir * no blockSignal * replay is now QOBject * cant take const char * rename inner it * get width and height from frame readeR * resolve TODO * seek in next pr * spaces * ui stuff * fix CI * remove comments * no repalce * trim segment fix * remove seek from stream * no cache key * final changes' * fix Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 19d962cdf37b80523deba6518057f2e860f65fee
4 years ago
}
std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last) {
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
double prev_replay_speed = speed_;
for (; !interrupt_requested_ && first != last; ++first) {
const Event &evt = *first;
int segment = toSeconds(evt.mono_time) / 60;
if (current_segment_.load(std::memory_order_relaxed) != segment) {
current_segment_.store(segment, std::memory_order_relaxed);
seg_mgr_->setCurrentSegment(segment);
}
cur_mono_time_ = evt.mono_time;
cur_which_ = evt.which;
// Skip events if socket is not present
if (!sockets_[evt.which]) continue;
const uint64_t current_nanos = nanos_since_boot();
const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);
// Reset timestamps for potential synchronization issues:
// - A negative time_diff may indicate slow execution or system wake-up,
// - A time_diff exceeding 1 second suggests a skipped segment.
if ((time_diff < -1e9 || time_diff >= 1e9) || speed_ != prev_replay_speed) {
evt_start_ts = evt.mono_time;
loop_start_ts = current_nanos;
prev_replay_speed = speed_;
} else if (time_diff > 0) {
precise_nano_sleep(time_diff, interrupt_requested_);
}
if (interrupt_requested_) break;
if (evt.eidx_segnum == -1) {
publishMessage(&evt);
} else if (camera_server_) {
if (speed_ > 1.0) {
camera_server_->waitForSent();
}
publishFrame(&evt);
}
}
return first;
}