diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 8b75dda19c..2d2b4cecf1 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -108,7 +108,7 @@ if GetOption('setup'): if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()): qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "replay/framereader.cc"] + replay_lib_src = ["replay/replay.cc", "replay/filereader.cc", "replay/framereader.cc", "replay/route.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs diff --git a/selfdrive/ui/replay/filereader.cc b/selfdrive/ui/replay/filereader.cc index 31ccadf0cc..7d105570b7 100644 --- a/selfdrive/ui/replay/filereader.cc +++ b/selfdrive/ui/replay/filereader.cc @@ -1,7 +1,8 @@ #include "selfdrive/ui/replay/filereader.h" +#include #include -#include +#include "selfdrive/common/util.h" static bool decompressBZ2(std::vector &dest, const char srcData[], size_t srcSize, size_t outputSizeIncrement = 0x100000U) { @@ -25,85 +26,24 @@ static bool decompressBZ2(std::vector &dest, const char srcData[], size return ret == BZ_STREAM_END; } -// class FileReader - -FileReader::FileReader(const QString &fn, QObject *parent) : url_(fn), QObject(parent) {} - -void FileReader::read() { - if (url_.isLocalFile()) { - QFile file(url_.toLocalFile()); - if (file.open(QIODevice::ReadOnly)) { - emit finished(file.readAll()); - } else { - emit failed(QString("Failed to read file %1").arg(url_.toString())); - } - } else { - startHttpRequest(); - } -} - -void FileReader::startHttpRequest() { - QNetworkAccessManager *qnam = new QNetworkAccessManager(this); - QNetworkRequest request(url_); - request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true); - reply_ = qnam->get(request); - connect(reply_, &QNetworkReply::finished, [=]() { - if (!reply_->error()) { - emit finished(reply_->readAll()); - } else { - emit failed(reply_->errorString()); - } - reply_->deleteLater(); - reply_ = nullptr; - }); -} - -void FileReader::abort() { - if (reply_) reply_->abort(); -} - -// class LogReader - -LogReader::LogReader(const QString &file, QObject *parent) : QObject(parent) { - file_reader_ = new FileReader(file); - file_reader_->moveToThread(&thread_); - connect(&thread_, &QThread::started, file_reader_, &FileReader::read); - connect(&thread_, &QThread::finished, file_reader_, &FileReader::deleteLater); - connect(file_reader_, &FileReader::finished, [=](const QByteArray &dat) { - parseEvents(dat); - }); - connect(file_reader_, &FileReader::failed, [=](const QString &err) { - qDebug() << err; - }); - thread_.start(); -} - LogReader::~LogReader() { - // wait until thread is finished. - exit_ = true; - file_reader_->abort(); - thread_.quit(); - thread_.wait(); - - // clear events - for (auto e : events) { - delete e; - } + for (auto e : events) delete e; } -void LogReader::parseEvents(const QByteArray &dat) { +bool LogReader::load(const std::string &file) { raw_.resize(1024 * 1024 * 64); - if (!decompressBZ2(raw_, dat.data(), dat.size())) { - qWarning() << "bz2 decompress failed"; + std::string dat = util::read_file(file); + if (dat.empty() || !decompressBZ2(raw_, dat.data(), dat.size())) { + LOGW("bz2 decompress failed"); + return false; } auto insertEidx = [&](CameraType type, const cereal::EncodeIndex::Reader &e) { eidx[type][e.getFrameId()] = {e.getSegmentNum(), e.getSegmentId()}; }; - valid_ = true; kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); - while (!exit_ && words.size() > 0) { + while (words.size() > 0) { try { std::unique_ptr evt = std::make_unique(words); switch (evt->which) { @@ -120,13 +60,11 @@ void LogReader::parseEvents(const QByteArray &dat) { break; } words = kj::arrayPtr(evt->reader.getEnd(), words.end()); - events.insert(evt->mono_time, evt.release()); + events.push_back(evt.release()); } catch (const kj::Exception &e) { - valid_ = false; - break; + return false; } } - if (!exit_) { - emit finished(valid_); - } + std::sort(events.begin(), events.end(), Event::lessThan()); + return true; } diff --git a/selfdrive/ui/replay/filereader.h b/selfdrive/ui/replay/filereader.h index 8393eedfef..cca8ef6216 100644 --- a/selfdrive/ui/replay/filereader.h +++ b/selfdrive/ui/replay/filereader.h @@ -3,46 +3,23 @@ #include #include -#include -#include -#include -#include -#include - #include - #include "cereal/gen/cpp/log.capnp.h" - #include "selfdrive/camerad/cameras/camera_common.h" -class FileReader : public QObject { - Q_OBJECT - -public: - FileReader(const QString &fn, QObject *parent = nullptr); - void read(); - void abort(); - -signals: - void finished(const QByteArray &dat); - void failed(const QString &err); - -private: - void startHttpRequest(); - QNetworkReply *reply_ = nullptr; - QUrl url_; -}; - const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const int MAX_CAMERAS = std::size(ALL_CAMERAS); - struct EncodeIdx { int segmentNum; uint32_t frameEncodeId; }; - class Event { public: + Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr{}) { + // construct a dummy Event for binary search, e.g std::upper_bound + this->which = which; + this->mono_time = mono_time; + } Event(const kj::ArrayPtr &amsg) : reader(amsg) { words = kj::ArrayPtr(amsg.begin(), reader.getEnd()); event = reader.getRoot(); @@ -51,6 +28,12 @@ public: } inline kj::ArrayPtr bytes() const { return words.asBytes(); } + struct lessThan { + inline bool operator()(const Event *l, const Event *r) { + return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which); + } + }; + uint64_t mono_time; cereal::Event::Which which; cereal::Event::Reader event; @@ -58,27 +41,15 @@ public: kj::ArrayPtr words; }; -class LogReader : public QObject { - Q_OBJECT - +class LogReader { public: - LogReader(const QString &file, QObject *parent = nullptr); + LogReader() = default; ~LogReader(); - inline bool valid() const { return valid_; } + bool load(const std::string &file); - QMultiMap events; + std::vector events; std::unordered_map eidx[MAX_CAMERAS] = {}; -signals: - void finished(bool success); - private: - void parseEvents(const QByteArray &dat); - - std::atomic exit_ = false; - std::atomic valid_ = false; std::vector raw_; - - FileReader *file_reader_ = nullptr; - QThread thread_; }; diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index 1e8d1de17a..ea5407039b 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -1,9 +1,6 @@ #include "selfdrive/ui/replay/framereader.h" -#include - #include -#include "selfdrive/common/timing.h" static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { std::mutex *mutex = (std::mutex *)*arg; @@ -35,7 +32,7 @@ public: ~AVInitializer() { avformat_network_deinit(); } }; -FrameReader::FrameReader(const std::string &url, int timeout_sec) : url_(url), timeout_(timeout_sec) { +FrameReader::FrameReader() { static AVInitializer av_initializer; } @@ -74,24 +71,14 @@ FrameReader::~FrameReader() { } } -int FrameReader::check_interrupt(void *p) { - FrameReader *fr = static_cast(p); - return fr->exit_ || (fr->timeout_ > 0 && millis_since_boot() > fr->timeout_ms_); -} - -bool FrameReader::process() { +bool FrameReader::load(const std::string &url) { pFormatCtx_ = avformat_alloc_context(); - pFormatCtx_->interrupt_callback.callback = &FrameReader::check_interrupt; - pFormatCtx_->interrupt_callback.opaque = (void *)this; - if (timeout_ > 0) { - timeout_ms_ = millis_since_boot() + timeout_ * 1000; - } - if (avformat_open_input(&pFormatCtx_, url_.c_str(), NULL, NULL) != 0) { - printf("error loading %s\n", url_.c_str()); + if (avformat_open_input(&pFormatCtx_, url.c_str(), NULL, NULL) != 0) { + printf("error loading %s\n", url.c_str()); return false; } avformat_find_stream_info(pFormatCtx_, NULL); - av_dump_format(pFormatCtx_, 0, url_.c_str(), 0); + av_dump_format(pFormatCtx_, 0, url.c_str(), 0); auto pCodecCtxOrig = pFormatCtx_->streams[0]->codec; auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id); diff --git a/selfdrive/ui/replay/framereader.h b/selfdrive/ui/replay/framereader.h index 84a7995870..a66e401d7c 100644 --- a/selfdrive/ui/replay/framereader.h +++ b/selfdrive/ui/replay/framereader.h @@ -19,9 +19,9 @@ extern "C" { class FrameReader { public: - FrameReader(const std::string &url, int timeout_sec = 0); + FrameReader(); ~FrameReader(); - bool process(); + bool load(const std::string &url); uint8_t *get(int idx); int getRGBSize() const { return width * height * 3; } size_t getFrameCount() const { return frames_.size(); } @@ -32,7 +32,6 @@ public: private: void decodeThread(); uint8_t *decodeFrame(AVPacket *pkt); - static int check_interrupt(void *p); struct Frame { AVPacket pkt = {}; uint8_t *data = nullptr; @@ -52,8 +51,5 @@ private: int decode_idx_ = 0; std::atomic exit_ = false; bool valid_ = false; - std::string url_; std::thread decode_thread_; - int timeout_ = 0; - double timeout_ms_ = 0; }; diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index a7a6b843af..94323b29dc 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -1,5 +1,6 @@ #include "selfdrive/ui/replay/replay.h" +#include #include #include @@ -67,6 +68,7 @@ int main(int argc, char *argv[]){ parser.addPositionalArgument("route", "the drive to replay. find your drives at connect.comma.ai"); parser.addOption({{"a", "allow"}, "whitelist of services to send", "allow"}); parser.addOption({{"b", "block"}, "blacklist of services to send", "block"}); + parser.addOption({{"s", "start"}, "start from ", "seconds"}); parser.addOption({"demo", "use a demo route instead of providing your own"}); parser.process(a); @@ -79,7 +81,7 @@ int main(int argc, char *argv[]){ QStringList allow = parser.value("allow").isEmpty() ? QStringList{} : parser.value("allow").split(","); QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(","); Replay *replay = new Replay(route, allow, block); - replay->start(); + replay->start(parser.value("start").toInt()); // start keyboard control thread QThread *t = QThread::create(keyboardThread, replay); diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 06a8ef9837..931a711812 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -1,7 +1,6 @@ #include "selfdrive/ui/replay/replay.h" -#include -#include +#include #include "cereal/services.h" #include "selfdrive/camerad/cameras/camera_common.h" @@ -14,7 +13,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s if ((allow.size() == 0 || allow.contains(it.name)) && !block.contains(it.name)) { s.push_back(it.name); - socks.append(std::string(it.name)); + socks.insert(it.name); } } qDebug() << "services " << s; @@ -23,184 +22,196 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s pm = new PubMaster(s); } - const QString url = CommaApi::BASE_URL + "/v1/route/" + route + "/files"; - http = new HttpRequest(this, !Hardware::PC()); - QObject::connect(http, &HttpRequest::receivedResponse, this, &Replay::parseResponse); - http->sendRequest(url); + route_ = std::make_unique(route); + events = new std::vector(); + // queueSegment is always executed in the main thread + connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); } -void Replay::parseResponse(const QString &response) { - QJsonDocument doc = QJsonDocument::fromJson(response.trimmed().toUtf8()); - if (doc.isNull()) { - qDebug() << "JSON Parse failed"; - return; - } - - camera_paths = doc["cameras"].toArray(); - log_paths = doc["logs"].toArray(); - - seekTo(0); +Replay::~Replay() { + // TODO: quit stream thread and free resources. } -void Replay::addSegment(int n) { - assert((n >= 0) && (n < log_paths.size()) && (n < camera_paths.size())); - if (lrs.find(n) != lrs.end()) { +void Replay::start(int seconds){ + // load route + if (!route_->load() || route_->size() == 0) { + qDebug() << "failed load route" << route_->name() << "from server"; return; } - lrs[n] = new LogReader(log_paths.at(n).toString()); - // this is a queued connection, mergeEvents is executed in the main thread. - QObject::connect(lrs[n], &LogReader::finished, this, &Replay::mergeEvents); - - frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString())); - QThread * t = QThread::create([=]() { frs[n]->process(); }); - QObject::connect(t, &QThread::finished, t, &QThread::deleteLater); - t->start(); -} - -void Replay::mergeEvents() { - const int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); - const int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); - - // merge logs - QMultiMap *new_events = new QMultiMap(); - std::unordered_map *new_eidx = new std::unordered_map[MAX_CAMERAS]; - for (int i = start_idx; i <= end_idx; ++i) { - if (auto it = lrs.find(i); it != lrs.end()) { - *new_events += (*it)->events; - for (CameraType cam_type : ALL_CAMERAS) { - new_eidx[cam_type].insert((*it)->eidx[cam_type].begin(), (*it)->eidx[cam_type].end()); - } - } - } - - // update logs - updating_events = true; // set updating_events to true to force stream thread relase the lock - lock.lock(); - auto prev_events = std::exchange(events, new_events); - auto prev_eidx = std::exchange(eidx, new_eidx); - updating_events = false; - lock.unlock(); - - // free logs - delete prev_events; - delete[] prev_eidx; - for (int i = 0; i < log_paths.size(); i++) { - if (i < start_idx || i > end_idx) { - delete lrs.take(i); - delete frs.take(i); - } - } -} + qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds; + segments.resize(route_->size()); + seekTo(seconds); -void Replay::start(){ + // start stream thread thread = new QThread; - QObject::connect(thread, &QThread::started, [=](){ - stream(); - }); + QObject::connect(thread, &QThread::started, [=]() { stream(); }); thread->start(); - - queue_thread = new QThread; - QObject::connect(queue_thread, &QThread::started, [=](){ - segmentQueueThread(); - }); - queue_thread->start(); } void Replay::seekTo(int seconds) { + if (segments.empty()) return; + updating_events = true; std::unique_lock lk(lock); - seconds = std::clamp(seconds, 0, log_paths.size() * 60); + seconds = std::clamp(seconds, 0, (int)segments.size() * 60); qInfo() << "seeking to " << seconds; seek_ts = seconds; - current_segment = seconds / 60; + setCurrentSegment(std::clamp(seconds / 60, 0, (int)segments.size() - 1)); updating_events = false; } void Replay::relativeSeek(int seconds) { - if (current_ts > 0) { - seekTo(current_ts + seconds); + seekTo(current_ts + seconds); +} + +void Replay::setCurrentSegment(int n) { + if (current_segment.exchange(n) != n) { + emit segmentChanged(n); } } -void Replay::segmentQueueThread() { - // maintain the segment window - while (true) { - int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); - int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); - for (int i = 0; i < log_paths.size(); i++) { - if (i >= start_idx && i <= end_idx) { - addSegment(i); +// maintain the segment window +void Replay::queueSegment() { + assert(QThread::currentThreadId() == qApp->thread()->currentThreadId()); + + // fetch segments forward + int cur_seg = current_segment.load(); + int end_idx = cur_seg; + for (int i = cur_seg, fwd = 0; i < segments.size() && fwd <= FORWARD_SEGS; ++i) { + if (!segments[i]) { + segments[i] = std::make_unique(i, route_->at(i)); + QObject::connect(segments[i].get(), &Segment::loadFinished, this, &Replay::queueSegment); + } + end_idx = i; + // skip invalid segment + fwd += segments[i]->isValid(); + } + + // merge segments + mergeSegments(cur_seg, end_idx); +} + +void Replay::mergeSegments(int cur_seg, int end_idx) { + // segments must be merged in sequence. + std::vector segments_need_merge; + const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0); + for (int i = begin_idx; i <= end_idx; ++i) { + if (segments[i] && segments[i]->isLoaded()) { + segments_need_merge.push_back(i); + } else if (i >= cur_seg) { + // segment is valid,but still loading. can't skip it to merge the next one. + // otherwise the stream thread may jump to the next segment. + break; + } + } + + if (segments_need_merge != segments_merged) { + qDebug() << "merge segments" << segments_need_merge; + segments_merged = segments_need_merge; + + std::vector *new_events = new std::vector(); + std::unordered_map *new_eidx = new std::unordered_map[MAX_CAMERAS]; + for (int n : segments_need_merge) { + auto &log = segments[n]->log; + // merge & sort events + auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); + std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); + for (CameraType cam_type : ALL_CAMERAS) { + new_eidx[cam_type].insert(log->eidx[cam_type].begin(), log->eidx[cam_type].end()); + } + } + + // update logs + // set updating_events to true to force stream thread relase the lock + updating_events = true; + lock.lock(); + + if (route_start_ts == 0) { + // get route start time from initData + auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); + if (it != new_events->end()) { + route_start_ts = (*it)->mono_time; + } + } + + auto prev_events = std::exchange(events, new_events); + auto prev_eidx = std::exchange(eidx, new_eidx); + updating_events = false; + + lock.unlock(); + + // free segments + delete prev_events; + delete[] prev_eidx; + for (int i = 0; i < segments.size(); i++) { + if ((i < begin_idx || i > end_idx) && segments[i]) { + segments[i].reset(nullptr); } } - QThread::msleep(100); } } void Replay::stream() { - QElapsedTimer timer; - timer.start(); - - route_start_ts = 0; + bool waiting_printed = false; uint64_t cur_mono_time = 0; + cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; + while (true) { std::unique_lock lk(lock); - if (!events || events->size() == 0) { - lk.unlock(); - qDebug() << "waiting for events"; - QThread::msleep(100); + uint64_t evt_start_ts = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time; + Event cur_event(cur_which, evt_start_ts); + auto eit = std::upper_bound(events->begin(), events->end(), &cur_event, Event::lessThan()); + if (eit == events->end()) { + lock.unlock(); + if (std::exchange(waiting_printed, true) == false) { + qDebug() << "waiting for events..."; + } + QThread::msleep(50); continue; } - - // TODO: use initData's logMonoTime - if (route_start_ts == 0) { - route_start_ts = events->firstKey(); - } - - uint64_t t0 = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time; + waiting_printed = false; seek_ts = -1; - qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9); - uint64_t t0r = timer.nsecsElapsed(); + uint64_t loop_start_ts = nanos_since_boot(); + qDebug() << "unlogging at" << int((evt_start_ts - route_start_ts) / 1e9); + + for (/**/; !updating_events && eit != events->end(); ++eit) { + const Event *evt = (*eit); + cur_which = evt->which; + cur_mono_time = evt->mono_time; + current_ts = (cur_mono_time - route_start_ts) / 1e9; - for (auto eit = events->lowerBound(t0); !updating_events && eit != events->end(); ++eit) { - cereal::Event::Reader e = (*eit)->event; - cur_mono_time = (*eit)->mono_time; - current_segment = (cur_mono_time - route_start_ts) / 1e9 / 60; std::string type; - KJ_IF_MAYBE(e_, static_cast(e).which()) { + KJ_IF_MAYBE(e_, static_cast(evt->event).which()) { type = e_->getProto().getName(); } - current_ts = std::max(cur_mono_time - route_start_ts, (uint64_t)0) / 1e9; - - if (socks.contains(type)) { - float timestamp = (cur_mono_time - route_start_ts)/1e9; - if (std::abs(timestamp - last_print) > 5.0) { - last_print = timestamp; + if (socks.find(type) != socks.end()) { + if (std::abs(current_ts - last_print) > 5.0) { + last_print = current_ts; qInfo() << "at " << int(last_print) << "s"; } + setCurrentSegment(current_ts / 60); // keep time - long etime = cur_mono_time-t0; - long rtime = timer.nsecsElapsed() - t0r; - long us_behind = ((etime-rtime)*1e-3)+0.5; + long etime = cur_mono_time - evt_start_ts; + long rtime = nanos_since_boot() - loop_start_ts; + long us_behind = ((etime - rtime) * 1e-3) + 0.5; if (us_behind > 0 && us_behind < 1e6) { QThread::usleep(us_behind); - //qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed(); } // publish frame // TODO: publish all frames - if (type == "roadCameraState") { - auto fr = e.getRoadCameraState(); - - auto it_ = eidx[RoadCam].find(fr.getFrameId()); + if (evt->which == cereal::Event::ROAD_CAMERA_STATE) { + auto it_ = eidx[RoadCam].find(evt->event.getRoadCameraState().getFrameId()); if (it_ != eidx[RoadCam].end()) { EncodeIdx &e = it_->second; - if (frs.find(e.segmentNum) != frs.end()) { - auto frm = frs[e.segmentNum]; + auto &seg = segments[e.segmentNum]; + if (seg && seg->isLoaded()) { + auto &frm = seg->frames[RoadCam]; if (vipc_server == nullptr) { cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); @@ -224,12 +235,10 @@ void Replay::stream() { // publish msg if (sm == nullptr) { - auto bytes = (*eit)->bytes(); + auto bytes = evt->bytes(); pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size()); } else { - std::vector> messages; - messages.push_back({type, e}); - sm->update_msgs(nanos_since_boot(), messages); + sm->update_msgs(nanos_since_boot(), {{type, evt->event}}); } } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 7a1ea13c0e..be54152319 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -1,17 +1,11 @@ #pragma once -#include - -#include #include +#include #include - #include "cereal/visionipc/visionipc_server.h" -#include "selfdrive/common/util.h" -#include "selfdrive/ui/qt/api.h" -#include "selfdrive/ui/replay/filereader.h" -#include "selfdrive/ui/replay/framereader.h" +#include "selfdrive/ui/replay/route.h" constexpr int FORWARD_SEGS = 2; constexpr int BACKWARD_SEGS = 2; @@ -21,45 +15,43 @@ class Replay : public QObject { public: Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, QObject *parent = 0); + ~Replay(); - void start(); - void addSegment(int n); + void start(int seconds = 0); void relativeSeek(int seconds); void seekTo(int seconds); - void stream(); - void segmentQueueThread(); +signals: + void segmentChanged(int); -public slots: - void parseResponse(const QString &response); - void mergeEvents(); +protected slots: + void queueSegment(); + +protected: + void stream(); + void setCurrentSegment(int n); + void mergeSegments(int begin_idx, int end_idx); -private: float last_print = 0; - uint64_t route_start_ts; + uint64_t route_start_ts = 0; std::atomic seek_ts = 0; std::atomic current_ts = 0; - std::atomic current_segment = 0; + std::atomic current_segment = -1; QThread *thread; - QThread *kb_thread; - QThread *queue_thread; // logs std::mutex lock; std::atomic updating_events = false; - QMultiMap *events = nullptr; + std::vector *events = nullptr; std::unordered_map *eidx = nullptr; - - HttpRequest *http; - QJsonArray camera_paths; - QJsonArray log_paths; - QMap lrs; - QMap frs; + std::vector> segments; + std::vector segments_merged; // messaging SubMaster *sm; PubMaster *pm; - QVector socks; + std::set socks; VisionIpcServer *vipc_server = nullptr; + std::unique_ptr route_; }; diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc new file mode 100644 index 0000000000..be8af108db --- /dev/null +++ b/selfdrive/ui/replay/route.cc @@ -0,0 +1,153 @@ +#include "selfdrive/ui/replay/route.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "selfdrive/hardware/hw.h" +#include "selfdrive/ui/qt/api.h" + +Route::Route(const QString &route) : route_(route) {} + +bool Route::load() { + QEventLoop loop; + auto onError = [&loop](const QString &err) { + qInfo() << err; + loop.quit(); + }; + + bool ret = false; + HttpRequest http(nullptr, !Hardware::PC()); + QObject::connect(&http, &HttpRequest::failedResponse, onError); + QObject::connect(&http, &HttpRequest::timeoutResponse, onError); + QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) { + ret = loadFromJson(json); + loop.quit(); + }); + http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files"); + loop.exec(); + return ret; +} + +bool Route::loadFromJson(const QString &json) { + QJsonObject route_files = QJsonDocument::fromJson(json.trimmed().toUtf8()).object(); + if (route_files.empty()) { + qInfo() << "JSON Parse failed"; + return false; + } + + QRegExp rx(R"(\/(\d+)\/)"); + for (const QString &key : route_files.keys()) { + for (const auto &url : route_files[key].toArray()) { + QString url_str = url.toString(); + if (rx.indexIn(url_str) != -1) { + const int seg_num = rx.cap(1).toInt(); + if (segments_.size() <= seg_num) { + segments_.resize(seg_num + 1); + } + if (key == "logs") { + segments_[seg_num].rlog = url_str; + } else if (key == "qlogs") { + segments_[seg_num].qlog = url_str; + } else if (key == "cameras") { + segments_[seg_num].road_cam = url_str; + } else if (key == "dcameras") { + segments_[seg_num].driver_cam = url_str; + } else if (key == "ecameras") { + segments_[seg_num].wide_road_cam = url_str; + } else if (key == "qcameras") { + segments_[seg_num].qcamera = url_str; + } + } + } + } + return true; +} + +// class Segment + +Segment::Segment(int n, const SegmentFile &segment_files) : seg_num_(n), files_(segment_files) { + static std::once_flag once_flag; + std::call_once(once_flag, [=]() { + if (!QDir(CACHE_DIR).exists()) QDir().mkdir(CACHE_DIR); + }); + + // fallback to qcamera + road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam; + valid_ = !files_.rlog.isEmpty() && !road_cam_path_.isEmpty(); + if (!valid_) return; + + if (!QUrl(files_.rlog).isLocalFile()) { + for (auto &url : {files_.rlog, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) { + if (!url.isEmpty() && !QFile::exists(localPath(url))) { + qDebug() << "download" << url; + downloadFile(url); + ++downloading_; + } + } + } + if (downloading_ == 0) { + QTimer::singleShot(0, this, &Segment::load); + } +} + +Segment::~Segment() { + // cancel download, qnam will not abort requests, need to abort them manually + aborting_ = true; + for (QNetworkReply *replay : replies_) { + if (replay->isRunning()) { + replay->abort(); + } + replay->deleteLater(); + } +} + +void Segment::downloadFile(const QString &url) { + QNetworkReply *reply = qnam_.get(QNetworkRequest(url)); + replies_.insert(reply); + connect(reply, &QNetworkReply::finished, [=]() { + if (reply->error() == QNetworkReply::NoError) { + QFile file(localPath(url)); + if (file.open(QIODevice::WriteOnly)) { + file.write(reply->readAll()); + } + } + if (--downloading_ == 0 && !aborting_) { + load(); + } + }); +} + +// load concurrency +void Segment::load() { + std::vector> futures; + futures.emplace_back(std::async(std::launch::async, [=]() { + log = std::make_unique(); + return log->load(localPath(files_.rlog).toStdString()); + })); + + QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam}; + for (int i = 0; i < std::size(camera_files); ++i) { + if (!camera_files[i].isEmpty()) { + futures.emplace_back(std::async(std::launch::async, [=]() { + frames[i] = std::make_unique(); + return frames[i]->load(localPath(camera_files[i]).toStdString()); + })); + } + } + + int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; }); + loaded_ = valid_ = (success_cnt == futures.size()); + emit loadFinished(); +} + +QString Segment::localPath(const QUrl &url) { + if (url.isLocalFile()) return url.toString(); + + QByteArray url_no_query = url.toString(QUrl::RemoveQuery).toUtf8(); + return CACHE_DIR + QString(QCryptographicHash::hash(url_no_query, QCryptographicHash::Sha256).toHex()); +} diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h new file mode 100644 index 0000000000..07616d195c --- /dev/null +++ b/selfdrive/ui/replay/route.h @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include + +#include "selfdrive/common/util.h" +#include "selfdrive/ui/replay/filereader.h" +#include "selfdrive/ui/replay/framereader.h" + +const QString CACHE_DIR = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str(); + +struct SegmentFile { + QString rlog; + QString qlog; + QString road_cam; + QString driver_cam; + QString wide_road_cam; + QString qcamera; +}; + +class Route { +public: + Route(const QString &route); + bool load(); + + inline const QString &name() const { return route_; }; + inline int size() const { return segments_.size(); } + inline SegmentFile &at(int n) { return segments_[n]; } + +protected: + bool loadFromJson(const QString &json); + QString route_; + std::vector segments_; +}; + +class Segment : public QObject { + Q_OBJECT + +public: + Segment(int n, const SegmentFile &segment_files); + ~Segment(); + inline bool isValid() const { return valid_; }; + inline bool isLoaded() const { return loaded_; } + + std::unique_ptr log; + std::unique_ptr frames[MAX_CAMERAS] = {}; + +signals: + void loadFinished(); + +protected: + void load(); + void downloadFile(const QString &url); + QString localPath(const QUrl &url); + + bool loaded_ = false, valid_ = false; + bool aborting_ = false; + int downloading_ = 0; + int seg_num_ = 0; + SegmentFile files_; + QString road_cam_path_; + QSet replies_; + QNetworkAccessManager qnam_; +}; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 95fd4874a0..d0c88d7ca0 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -1,18 +1,15 @@ #define CATCH_CONFIG_MAIN #include "catch2/catch.hpp" - #include "selfdrive/ui/replay/framereader.h" const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; TEST_CASE("FrameReader") { SECTION("process&get") { - FrameReader fr(stream_url); - bool ret = fr.process(); - REQUIRE(ret == true); + FrameReader fr; + REQUIRE(fr.load(stream_url) == true); REQUIRE(fr.valid() == true); REQUIRE(fr.getFrameCount() == 1200); - // random get 50 frames // srand(time(NULL)); // for (int i = 0; i < 50; ++i) { @@ -24,11 +21,4 @@ TEST_CASE("FrameReader") { REQUIRE(fr.get(i) != nullptr); } } - SECTION("process with timeout") { - FrameReader fr(stream_url, 1); - bool ret = fr.process(); - REQUIRE(ret == false); - REQUIRE(fr.valid() == false); - REQUIRE(fr.getFrameCount() < 1200); - } }