replay: thread safe merge & free logs (#22196)

old-commit-hash: a1a0201069
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 3bf834d4ec
commit 7100381a02
  1. 94
      selfdrive/ui/replay/replay.cc
  2. 11
      selfdrive/ui/replay/replay.h

@ -80,36 +80,37 @@ void Replay::addSegment(int n) {
t->start(); t->start();
} }
void Replay::removeSegment(int n) { void Replay::mergeEvents() {
// TODO: fix LogReader destructors const int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
/* const int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
if (lrs.contains(n)) {
auto lr = lrs.take(n); // merge logs
delete lr; QMultiMap<uint64_t, Event *> *new_events = new QMultiMap<uint64_t, Event *>();
} std::unordered_map<uint32_t, EncodeIdx> *new_eidx = new std::unordered_map<uint32_t, EncodeIdx>[MAX_CAMERAS];
for (int i = start_idx; i <= end_idx; ++i) {
events_lock.lockForWrite(); if (auto it = lrs.find(i); it != lrs.end()) {
auto eit = events.begin(); *new_events += (*it)->events;
while (eit != events.end()) { for (CameraType cam_type : ALL_CAMERAS) {
if(std::abs(eit.key()/1e9 - getCurrentTime()/1e9) > 60.0){ new_eidx[cam_type].merge((*it)->eidx[cam_type]);
eit = events.erase(eit); }
continue;
} }
eit++;
}
events_lock.unlock();
*/
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
} }
}
void Replay::mergeEvents() { // update logs
LogReader *log = qobject_cast<LogReader *>(sender()); updating_events = true; // set updating_events to true to force stream thread relase the lock
events += log->events; lock.lock();
for (CameraType cam_type : ALL_CAMERAS) { auto prev_events = std::exchange(events, new_events);
eidx[cam_type].merge(log->eidx[cam_type]); auto prev_eidx = std::exchange(eidx, new_eidx);
lock.unlock();
// free logs
delete prev_events;
delete[] prev_eidx;
for (int i = 0; i < log_paths.size(); i++) {
if (i < start_idx || i > end_idx) {
delete lrs.take(i);
delete frs.take(i);
}
} }
} }
@ -139,18 +140,17 @@ void Replay::seekTime(int ts) {
seek_ts = ts; seek_ts = ts;
current_segment = ts/60; current_segment = ts/60;
updating_events = true;
} }
void Replay::segmentQueueThread() { void Replay::segmentQueueThread() {
// maintain the segment window // maintain the segment window
while (true) { while (true) {
int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
for (int i = 0; i < log_paths.size(); i++) { for (int i = 0; i < log_paths.size(); i++) {
int start_idx = std::max(current_segment - BACKWARD_SEGS, 0);
int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size());
if (i >= start_idx && i <= end_idx) { if (i >= start_idx && i <= end_idx) {
addSegment(i); addSegment(i);
} else {
removeSegment(i);
} }
} }
QThread::msleep(100); QThread::msleep(100);
@ -196,8 +196,11 @@ void Replay::stream() {
timer.start(); timer.start();
route_start_ts = 0; route_start_ts = 0;
uint64_t cur_mono_time = 0;
while (true) { while (true) {
if (events.size() == 0) { std::unique_lock lk(lock);
if (!events || events->size() == 0) {
qDebug() << "waiting for events"; qDebug() << "waiting for events";
QThread::msleep(100); QThread::msleep(100);
continue; continue;
@ -205,40 +208,33 @@ void Replay::stream() {
// TODO: use initData's logMonoTime // TODO: use initData's logMonoTime
if (route_start_ts == 0) { if (route_start_ts == 0) {
route_start_ts = events.firstKey(); route_start_ts = events->firstKey();
} }
uint64_t t0 = route_start_ts + (seek_ts * 1e9); uint64_t t0 = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time;
seek_ts = -1; seek_ts = -1;
qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9); qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9);
// wait until we have events within 1s of the current time
auto eit = events.lowerBound(t0);
while (eit.key() - t0 > 1e9) {
eit = events.lowerBound(t0);
QThread::msleep(10);
}
uint64_t t0r = timer.nsecsElapsed(); uint64_t t0r = timer.nsecsElapsed();
while ((eit != events.end()) && seek_ts < 0) {
for (auto eit = events->lowerBound(t0); !updating_events && eit != events->end(); ++eit) {
cereal::Event::Reader e = (*eit)->event; cereal::Event::Reader e = (*eit)->event;
cur_mono_time = (*eit)->mono_time;
std::string type; std::string type;
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) { KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) {
type = e_->getProto().getName(); type = e_->getProto().getName();
} }
uint64_t tm = e.getLogMonoTime(); current_ts = std::max(cur_mono_time - route_start_ts, (uint64_t)0) / 1e9;
current_ts = std::max(tm - route_start_ts, (uint64_t)0) / 1e9;
if (socks.contains(type)) { if (socks.contains(type)) {
float timestamp = (tm - route_start_ts)/1e9; float timestamp = (cur_mono_time - route_start_ts)/1e9;
if (std::abs(timestamp - last_print) > 5.0) { if (std::abs(timestamp - last_print) > 5.0) {
last_print = timestamp; last_print = timestamp;
qInfo() << "at " << int(last_print) << "s"; qInfo() << "at " << int(last_print) << "s";
} }
// keep time // keep time
long etime = tm-t0; long etime = cur_mono_time-t0;
long rtime = timer.nsecsElapsed() - t0r; long rtime = timer.nsecsElapsed() - t0r;
long us_behind = ((etime-rtime)*1e-3)+0.5; long us_behind = ((etime-rtime)*1e-3)+0.5;
if (us_behind > 0 && us_behind < 1e6) { if (us_behind > 0 && us_behind < 1e6) {
@ -287,8 +283,8 @@ void Replay::stream() {
sm->update_msgs(nanos_since_boot(), messages); sm->update_msgs(nanos_since_boot(), messages);
} }
} }
++eit;
} }
updating_events = false;
usleep(0);
} }
} }

@ -4,7 +4,6 @@
#include <termios.h> #include <termios.h>
#include <QJsonArray> #include <QJsonArray>
#include <QReadWriteLock>
#include <QThread> #include <QThread>
#include <capnp/dynamic.h> #include <capnp/dynamic.h>
@ -15,11 +14,9 @@
#include "selfdrive/ui/replay/filereader.h" #include "selfdrive/ui/replay/filereader.h"
#include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/framereader.h"
constexpr int FORWARD_SEGS = 2; constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 2; constexpr int BACKWARD_SEGS = 2;
class Replay : public QObject { class Replay : public QObject {
Q_OBJECT Q_OBJECT
@ -28,7 +25,6 @@ public:
void start(); void start();
void addSegment(int n); void addSegment(int n);
void removeSegment(int n);
void seekTime(int ts); void seekTime(int ts);
public slots: public slots:
@ -50,9 +46,10 @@ private:
QThread *queue_thread; QThread *queue_thread;
// logs // logs
QMultiMap<uint64_t, Event*> events; std::mutex lock;
QReadWriteLock events_lock; std::atomic<bool> updating_events = false;
std::unordered_map<uint32_t, EncodeIdx> eidx[MAX_CAMERAS]; QMultiMap<uint64_t, Event *> *events = nullptr;
std::unordered_map<uint32_t, EncodeIdx> *eidx = nullptr;
HttpRequest *http; HttpRequest *http;
QJsonArray camera_paths; QJsonArray camera_paths;

Loading…
Cancel
Save