diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index f339676f65..fedd035323 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -80,36 +80,37 @@ void Replay::addSegment(int n) { t->start(); } -void Replay::removeSegment(int n) { - // TODO: fix LogReader destructors - /* - if (lrs.contains(n)) { - auto lr = lrs.take(n); - delete lr; - } - - events_lock.lockForWrite(); - auto eit = events.begin(); - while (eit != events.end()) { - if(std::abs(eit.key()/1e9 - getCurrentTime()/1e9) > 60.0){ - eit = events.erase(eit); - continue; +void Replay::mergeEvents() { + const int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); + const int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); + + // merge logs + QMultiMap *new_events = new QMultiMap(); + std::unordered_map *new_eidx = new std::unordered_map[MAX_CAMERAS]; + for (int i = start_idx; i <= end_idx; ++i) { + if (auto it = lrs.find(i); it != lrs.end()) { + *new_events += (*it)->events; + for (CameraType cam_type : ALL_CAMERAS) { + new_eidx[cam_type].merge((*it)->eidx[cam_type]); + } } - eit++; - } - events_lock.unlock(); - */ - if (frs.contains(n)) { - auto fr = frs.take(n); - delete fr; } -} -void Replay::mergeEvents() { - LogReader *log = qobject_cast(sender()); - events += log->events; - for (CameraType cam_type : ALL_CAMERAS) { - eidx[cam_type].merge(log->eidx[cam_type]); + // update logs + updating_events = true; // set updating_events to true to force stream thread relase the lock + lock.lock(); + auto prev_events = std::exchange(events, new_events); + auto prev_eidx = std::exchange(eidx, new_eidx); + lock.unlock(); + + // free logs + delete prev_events; + delete[] prev_eidx; + for (int i = 0; i < log_paths.size(); i++) { + if (i < start_idx || i > end_idx) { + delete lrs.take(i); + delete frs.take(i); + } } } @@ -139,18 +140,17 @@ void Replay::seekTime(int ts) { seek_ts = ts; current_segment = ts/60; + updating_events = true; } void Replay::segmentQueueThread() { // maintain the segment window while (true) { + int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); + int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); for (int i = 0; i < log_paths.size(); i++) { - int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); - int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); if (i >= start_idx && i <= end_idx) { addSegment(i); - } else { - removeSegment(i); } } QThread::msleep(100); @@ -196,8 +196,11 @@ void Replay::stream() { timer.start(); route_start_ts = 0; + uint64_t cur_mono_time = 0; while (true) { - if (events.size() == 0) { + std::unique_lock lk(lock); + + if (!events || events->size() == 0) { qDebug() << "waiting for events"; QThread::msleep(100); continue; @@ -205,40 +208,33 @@ void Replay::stream() { // TODO: use initData's logMonoTime if (route_start_ts == 0) { - route_start_ts = events.firstKey(); + route_start_ts = events->firstKey(); } - uint64_t t0 = route_start_ts + (seek_ts * 1e9); + uint64_t t0 = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time; seek_ts = -1; qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9); - - // wait until we have events within 1s of the current time - auto eit = events.lowerBound(t0); - while (eit.key() - t0 > 1e9) { - eit = events.lowerBound(t0); - QThread::msleep(10); - } - uint64_t t0r = timer.nsecsElapsed(); - while ((eit != events.end()) && seek_ts < 0) { + + for (auto eit = events->lowerBound(t0); !updating_events && eit != events->end(); ++eit) { cereal::Event::Reader e = (*eit)->event; + cur_mono_time = (*eit)->mono_time; std::string type; KJ_IF_MAYBE(e_, static_cast(e).which()) { type = e_->getProto().getName(); } - uint64_t tm = e.getLogMonoTime(); - current_ts = std::max(tm - route_start_ts, (uint64_t)0) / 1e9; + current_ts = std::max(cur_mono_time - route_start_ts, (uint64_t)0) / 1e9; if (socks.contains(type)) { - float timestamp = (tm - route_start_ts)/1e9; + float timestamp = (cur_mono_time - route_start_ts)/1e9; if (std::abs(timestamp - last_print) > 5.0) { last_print = timestamp; qInfo() << "at " << int(last_print) << "s"; } // keep time - long etime = tm-t0; + long etime = cur_mono_time-t0; long rtime = timer.nsecsElapsed() - t0r; long us_behind = ((etime-rtime)*1e-3)+0.5; if (us_behind > 0 && us_behind < 1e6) { @@ -287,8 +283,8 @@ void Replay::stream() { sm->update_msgs(nanos_since_boot(), messages); } } - - ++eit; } + updating_events = false; + usleep(0); } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index a6c9cfbb76..cb514acaad 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -4,7 +4,6 @@ #include #include -#include #include #include @@ -15,11 +14,9 @@ #include "selfdrive/ui/replay/filereader.h" #include "selfdrive/ui/replay/framereader.h" - constexpr int FORWARD_SEGS = 2; constexpr int BACKWARD_SEGS = 2; - class Replay : public QObject { Q_OBJECT @@ -28,7 +25,6 @@ public: void start(); void addSegment(int n); - void removeSegment(int n); void seekTime(int ts); public slots: @@ -50,9 +46,10 @@ private: QThread *queue_thread; // logs - QMultiMap events; - QReadWriteLock events_lock; - std::unordered_map eidx[MAX_CAMERAS]; + std::mutex lock; + std::atomic updating_events = false; + QMultiMap *events = nullptr; + std::unordered_map *eidx = nullptr; HttpRequest *http; QJsonArray camera_paths;