#include "tools/replay/replay.h" #include #include #include #include "cereal/services.h" #include "common/params.h" #include "common/timing.h" #include "system/hardware/hw.h" #include "tools/replay/util.h" Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { std::vector s; auto event_struct = capnp::Schema::from().asStruct(); sockets_.resize(event_struct.getUnionFields().size()); for (const auto &it : services) { if ((allow.empty() || allow.contains(it.name)) && !block.contains(it.name)) { uint16_t which = event_struct.getFieldByName(it.name).getProto().getDiscriminantValue(); sockets_[which] = it.name; s.push_back(it.name); } } qDebug() << "services " << s; qDebug() << "loading route " << route; if (sm == nullptr) { pm = std::make_unique(s); } route_ = std::make_unique(route, data_dir); events_ = std::make_unique>(); new_events_ = std::make_unique>(); } Replay::~Replay() { stop(); } void Replay::stop() { if (!stream_thread_ && segments_.empty()) return; rInfo("shutdown: in progress..."); if (stream_thread_ != nullptr) { exit_ = updating_events_ = true; stream_cv_.notify_one(); stream_thread_->quit(); stream_thread_->wait(); stream_thread_ = nullptr; } segments_.clear(); camera_server_.reset(nullptr); timeline_future.waitForFinished(); rInfo("shutdown: done"); } bool Replay::load() { if (!route_->load()) { qCritical() << "failed to load route" << route_->name() << "from" << (route_->dir().isEmpty() ? "server" : route_->dir()); return false; } for (auto &[n, f] : route_->segments()) { bool has_log = !f.rlog.isEmpty() || !f.qlog.isEmpty(); bool has_video = !f.road_cam.isEmpty() || !f.qcamera.isEmpty(); if (has_log && (has_video || hasFlag(REPLAY_FLAG_NO_VIPC))) { segments_.insert({n, nullptr}); } } if (segments_.empty()) { qCritical() << "no valid segments in route" << route_->name(); return false; } rInfo("load route %s with %zu valid segments", qPrintable(route_->name()), segments_.size()); return true; } void Replay::start(int seconds) { seekTo(route_->identifier().segment_id * 60 + seconds, false); } void Replay::updateEvents(const std::function &lambda) { // set updating_events to true to force stream thread relase the lock and wait for evnets_udpated. updating_events_ = true; { std::unique_lock lk(stream_lock_); events_updated_ = lambda(); updating_events_ = false; } stream_cv_.notify_one(); } void Replay::seekTo(int seconds, bool relative) { seconds = relative ? seconds + currentSeconds() : seconds; updateEvents([&]() { seconds = std::max(0, seconds); int seg = seconds / 60; if (segments_.find(seg) == segments_.end()) { rWarning("can't seek to %d s segment %d is invalid", seconds, seg); return true; } rInfo("seeking to %d s, segment %d", seconds, seg); current_segment_ = seg; cur_mono_time_ = route_start_ts_ + seconds * 1e9; return isSegmentMerged(seg); }); queueSegment(); } void Replay::seekToFlag(FindFlag flag) { if (auto next = find(flag)) { seekTo(*next - 2, false); // seek to 2 seconds before next } } void Replay::buildTimeline() { uint64_t engaged_begin = 0; uint64_t alert_begin = 0; TimelineType alert_type = TimelineType::None; for (int i = 0; i < segments_.size() && !exit_; ++i) { LogReader log; if (!log.load(route_->at(i).qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; for (const Event *e : log.events) { if (e->which == cereal::Event::Which::CONTROLS_STATE) { auto cs = e->event.getControlsState(); if (!engaged_begin && cs.getEnabled()) { engaged_begin = e->mono_time; } else if (engaged_begin && !cs.getEnabled()) { std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged}); engaged_begin = 0; } if (!alert_begin && cs.getAlertType().size() > 0) { alert_begin = e->mono_time; alert_type = TimelineType::AlertInfo; if (cs.getAlertStatus() != cereal::ControlsState::AlertStatus::NORMAL) { alert_type = cs.getAlertStatus() == cereal::ControlsState::AlertStatus::USER_PROMPT ? TimelineType::AlertWarning : TimelineType::AlertCritical; } } else if (alert_begin && cs.getAlertType().size() == 0) { std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), alert_type}); alert_begin = 0; } } else if (e->which == cereal::Event::Which::USER_FLAG) { std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(e->mono_time), toSeconds(e->mono_time), TimelineType::UserFlag}); } } } } std::optional Replay::find(FindFlag flag) { int cur_ts = currentSeconds(); for (auto [start_ts, end_ts, type] : getTimeline()) { if (type == TimelineType::Engaged) { if (flag == FindFlag::nextEngagement && start_ts > cur_ts) { return start_ts; } else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) { return end_ts; } } else if (type == TimelineType::UserFlag) { if (flag == FindFlag::nextUserFlag && start_ts > cur_ts) { return start_ts; } } } return std::nullopt; } void Replay::pause(bool pause) { updateEvents([=]() { rWarning("%s at %d s", pause ? "paused..." : "resuming", currentSeconds()); paused_ = pause; return true; }); } void Replay::setCurrentSegment(int n) { if (current_segment_.exchange(n) != n) { QMetaObject::invokeMethod(this, &Replay::queueSegment, Qt::QueuedConnection); } } void Replay::segmentLoadFinished(bool success) { if (!success) { Segment *seg = qobject_cast(sender()); rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num); segments_.erase(seg->seg_num); } queueSegment(); } void Replay::queueSegment() { if (segments_.empty()) return; SegmentMap::iterator cur, end; cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); for (int i = 0; end != segments_.end() && i <= FORWARD_SEGS; ++i) { ++end; } // load one segment at a time for (auto it = cur; it != end; ++it) { auto &[n, seg] = *it; if ((seg && !seg->isLoaded()) || !seg) { if (!seg) { rDebug("loading segment %d...", n); seg = std::make_unique(n, route_->at(n), flags_); QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); } break; } } const auto &cur_segment = cur->second; // merge the previous adjacent segment if it's loaded auto begin = segments_.find(cur_segment->seg_num - 1); if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) { begin = cur; } mergeSegments(begin, end); // free segments out of current semgnt window. std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); }); std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); }); // start stream thread if (stream_thread_ == nullptr && cur_segment->isLoaded()) { startStream(cur_segment.get()); emit streamStarted(); } } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { // merge 3 segments in sequence. std::vector segments_need_merge; size_t new_events_size = 0; for (auto it = begin; it != end && it->second && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) { segments_need_merge.push_back(it->first); new_events_size += it->second->log->events.size(); } if (segments_need_merge != segments_merged_) { std::string s; for (int i = 0; i < segments_need_merge.size(); ++i) { s += std::to_string(segments_need_merge[i]); if (i != segments_need_merge.size() - 1) s += ", "; } rDebug("merge segments %s", s.c_str()); new_events_->clear(); new_events_->reserve(new_events_size); for (int n : segments_need_merge) { const auto &e = segments_[n]->log->events; auto middle = new_events_->insert(new_events_->end(), e.begin(), e.end()); std::inplace_merge(new_events_->begin(), middle, new_events_->end(), Event::lessThan()); } updateEvents([&]() { events_.swap(new_events_); segments_merged_ = segments_need_merge; return true; }); } } void Replay::startStream(const Segment *cur_segment) { const auto &events = cur_segment->log->events; // get route start time from initData auto it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); route_start_ts_ = it != events.end() ? (*it)->mono_time : events[0]->mono_time; cur_mono_time_ += route_start_ts_; // write CarParams it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); if (it != events.end()) { car_fingerprint_ = (*it)->event.getCarParams().getCarFingerprint(); capnp::MallocMessageBuilder builder; builder.setRoot((*it)->event.getCarParams()); auto words = capnp::messageToFlatArray(builder); auto bytes = words.asBytes(); Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); } else { rWarning("failed to read CarParams from current segment"); } // start camera server if (!hasFlag(REPLAY_FLAG_NO_VIPC)) { std::pair camera_size[MAX_CAMERAS] = {}; for (auto type : ALL_CAMERAS) { if (auto &fr = cur_segment->frames[type]) { camera_size[type] = {fr->width, fr->height}; } } camera_server_ = std::make_unique(camera_size); } // start stream thread stream_thread_ = new QThread(); QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); }); QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater); stream_thread_->start(); timeline_future = QtConcurrent::run(this, &Replay::buildTimeline); } void Replay::publishMessage(const Event *e) { if (sm == nullptr) { auto bytes = e->bytes(); int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); if (ret == -1) { rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); sockets_[e->which] = nullptr; } } else { sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}}); } } void Replay::publishFrame(const Event *e) { static const std::map cam_types{ {cereal::Event::ROAD_ENCODE_IDX, RoadCam}, {cereal::Event::DRIVER_ENCODE_IDX, DriverCam}, {cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam}, }; if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !hasFlag(REPLAY_FLAG_DCAM)) || (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) { return; } auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs(); if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentMerged(eidx.getSegmentNum())) { CameraType cam = cam_types.at(e->which); camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); } } void Replay::stream() { cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; std::unique_lock lk(stream_lock_); while (true) { stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); }); events_updated_ = false; if (exit_) break; Event cur_event(cur_which, cur_mono_time_); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); if (eit == events_->end()) { rInfo("waiting for events..."); continue; } uint64_t evt_start_ts = cur_mono_time_; uint64_t loop_start_ts = nanos_since_boot(); for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { const Event *evt = (*eit); cur_which = evt->which; cur_mono_time_ = evt->mono_time; setCurrentSegment(toSeconds(cur_mono_time_) / 60); // migration for pandaState -> pandaStates to keep UI working for old segments if (cur_which == cereal::Event::Which::PANDA_STATE_D_E_P_R_E_C_A_T_E_D && sockets_[cereal::Event::Which::PANDA_STATES] != nullptr) { MessageBuilder msg; auto ps = msg.initEvent().initPandaStates(1); ps[0].setIgnitionLine(true); ps[0].setPandaType(cereal::PandaState::PandaType::DOS); pm->send(sockets_[cereal::Event::Which::PANDA_STATES], msg); } if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) { // keep time long etime = cur_mono_time_ - evt_start_ts; long rtime = nanos_since_boot() - loop_start_ts; long behind_ns = etime - rtime; // if behind_ns is greater than 1 second, it means that an invalid segemnt is skipped by seeking/replaying if (behind_ns >= 1 * 1e9) { // reset start times evt_start_ts = cur_mono_time_; loop_start_ts = nanos_since_boot(); } else if (behind_ns > 0 && !hasFlag(REPLAY_FLAG_FULL_SPEED)) { precise_nano_sleep(behind_ns); } if (!evt->frame) { publishMessage(evt); } else if (camera_server_) { if (hasFlag(REPLAY_FLAG_FULL_SPEED)) { camera_server_->waitForSent(); } publishFrame(evt); } } } // wait for frame to be sent before unlock.(frameReader may be deleted after unlock) if (camera_server_) { camera_server_->waitForSent(); } if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) { int last_segment = segments_.rbegin()->first; if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { rInfo("reaches the end of route, restart from beginning"); QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, 0, false), Qt::QueuedConnection); } } } }