diff --git a/selfdrive/ui/replay/filereader.cc b/selfdrive/ui/replay/filereader.cc index e005144f8c..31ccadf0cc 100644 --- a/selfdrive/ui/replay/filereader.cc +++ b/selfdrive/ui/replay/filereader.cc @@ -1,139 +1,132 @@ #include "selfdrive/ui/replay/filereader.h" +#include #include -FileReader::FileReader(const QString& file_) : file(file_) { -} +static bool decompressBZ2(std::vector &dest, const char srcData[], size_t srcSize, + size_t outputSizeIncrement = 0x100000U) { + bz_stream strm = {}; + int ret = BZ2_bzDecompressInit(&strm, 0, 0); + assert(ret == BZ_OK); + + strm.next_in = const_cast(srcData); + strm.avail_in = srcSize; + do { + strm.next_out = (char *)&dest[strm.total_out_lo32]; + strm.avail_out = dest.size() - strm.total_out_lo32; + ret = BZ2_bzDecompress(&strm); + if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { + dest.resize(dest.size() + outputSizeIncrement); + } + } while (ret == BZ_OK); -void FileReader::process() { - timer.start(); - QString str = file.simplified(); - str.replace(" ", ""); - startRequest(QUrl(str)); + BZ2_bzDecompressEnd(&strm); + dest.resize(strm.total_out_lo32); + return ret == BZ_STREAM_END; } -void FileReader::startRequest(const QUrl &url) { - qnam = new QNetworkAccessManager; - reply = qnam->get(QNetworkRequest(url)); - connect(reply, &QNetworkReply::finished, this, &FileReader::httpFinished); - connect(reply, &QIODevice::readyRead, this, &FileReader::readyRead); - qDebug() << "requesting" << url; -} +// class FileReader -void FileReader::httpFinished() { - if (reply->error()) { - qWarning() << reply->errorString(); - } +FileReader::FileReader(const QString &fn, QObject *parent) : url_(fn), QObject(parent) {} - const QVariant redirectionTarget = reply->attribute(QNetworkRequest::RedirectionTargetAttribute); - if (!redirectionTarget.isNull()) { - const QUrl redirectedUrl = redirectionTarget.toUrl(); - //qDebug() << "redirected to" << redirectedUrl; - startRequest(redirectedUrl); +void FileReader::read() { + if (url_.isLocalFile()) { + QFile file(url_.toLocalFile()); + if (file.open(QIODevice::ReadOnly)) { + emit finished(file.readAll()); + } else { + emit failed(QString("Failed to read file %1").arg(url_.toString())); + } } else { - qDebug() << "done in" << timer.elapsed() << "ms"; - done(); + startHttpRequest(); } } -void FileReader::readyRead() { - QByteArray dat = reply->readAll(); - printf("got http ready read: %d\n", dat.size()); +void FileReader::startHttpRequest() { + QNetworkAccessManager *qnam = new QNetworkAccessManager(this); + QNetworkRequest request(url_); + request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true); + reply_ = qnam->get(request); + connect(reply_, &QNetworkReply::finished, [=]() { + if (!reply_->error()) { + emit finished(reply_->readAll()); + } else { + emit failed(reply_->errorString()); + } + reply_->deleteLater(); + reply_ = nullptr; + }); } -FileReader::~FileReader() { - +void FileReader::abort() { + if (reply_) reply_->abort(); } -LogReader::LogReader(const QString& file, Events *events_, QReadWriteLock* events_lock_, QMap > *eidx_) : - FileReader(file), events(events_), events_lock(events_lock_), eidx(eidx_) { - bStream.next_in = NULL; - bStream.avail_in = 0; - bStream.bzalloc = NULL; - bStream.bzfree = NULL; - bStream.opaque = NULL; +// class LogReader - int ret = BZ2_bzDecompressInit(&bStream, 0, 0); - if (ret != BZ_OK) qWarning() << "bz2 init failed"; - - // start with 64MB buffer - raw.resize(1024*1024*64); - - // auto increment? - bStream.next_out = raw.data(); - bStream.avail_out = raw.size(); - - // parsed no events yet - event_offset = 0; - - parser = new std::thread([&]() { - while (true) { - mergeEvents(cdled.get()); - } +LogReader::LogReader(const QString &file, QObject *parent) : QObject(parent) { + file_reader_ = new FileReader(file); + file_reader_->moveToThread(&thread_); + connect(&thread_, &QThread::started, file_reader_, &FileReader::read); + connect(&thread_, &QThread::finished, file_reader_, &FileReader::deleteLater); + connect(file_reader_, &FileReader::finished, [=](const QByteArray &dat) { + parseEvents(dat); + }); + connect(file_reader_, &FileReader::failed, [=](const QString &err) { + qDebug() << err; }); + thread_.start(); } LogReader::~LogReader() { - delete parser; + // wait until thread is finished. + exit_ = true; + file_reader_->abort(); + thread_.quit(); + thread_.wait(); + + // clear events + for (auto e : events) { + delete e; + } } -void LogReader::mergeEvents(int dled) { - auto amsg = kj::arrayPtr((const capnp::word*)(raw.data() + event_offset), (dled-event_offset)/sizeof(capnp::word)); - Events events_local; - QMap > eidx_local; - - while (amsg.size() > 0) { - try { - capnp::FlatArrayMessageReader cmsg = capnp::FlatArrayMessageReader(amsg); - - // this needed? it is - capnp::FlatArrayMessageReader *tmsg = - new capnp::FlatArrayMessageReader(kj::arrayPtr(amsg.begin(), cmsg.getEnd())); - - amsg = kj::arrayPtr(cmsg.getEnd(), amsg.end()); +void LogReader::parseEvents(const QByteArray &dat) { + raw_.resize(1024 * 1024 * 64); + if (!decompressBZ2(raw_, dat.data(), dat.size())) { + qWarning() << "bz2 decompress failed"; + } - cereal::Event::Reader event = tmsg->getRoot(); - events_local.insert(event.getLogMonoTime(), event); + auto insertEidx = [&](CameraType type, const cereal::EncodeIndex::Reader &e) { + eidx[type][e.getFrameId()] = {e.getSegmentNum(), e.getSegmentId()}; + }; - // hack - // TODO: rewrite with callback - if (event.which() == cereal::Event::ROAD_ENCODE_IDX) { - auto ee = event.getRoadEncodeIdx(); - eidx_local.insert(ee.getFrameId(), qMakePair(ee.getSegmentNum(), ee.getSegmentId())); + valid_ = true; + kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); + while (!exit_ && words.size() > 0) { + try { + std::unique_ptr evt = std::make_unique(words); + switch (evt->which) { + case cereal::Event::ROAD_ENCODE_IDX: + insertEidx(RoadCam, evt->event.getRoadEncodeIdx()); + break; + case cereal::Event::DRIVER_ENCODE_IDX: + insertEidx(DriverCam, evt->event.getDriverEncodeIdx()); + break; + case cereal::Event::WIDE_ROAD_ENCODE_IDX: + insertEidx(WideRoadCam, evt->event.getWideRoadEncodeIdx()); + break; + default: + break; } - - // increment - event_offset = (char*)cmsg.getEnd() - raw.data(); - } catch (const kj::Exception& e) { - // partial messages trigger this - //qDebug() << e.getDescription().cStr(); + words = kj::arrayPtr(evt->reader.getEnd(), words.end()); + events.insert(evt->mono_time, evt.release()); + } catch (const kj::Exception &e) { + valid_ = false; break; } } - - // merge in events - // TODO: add lock - events_lock->lockForWrite(); - *events += events_local; - eidx->unite(eidx_local); - events_lock->unlock(); -} - -void LogReader::readyRead() { - QByteArray dat = reply->readAll(); - - bStream.next_in = dat.data(); - bStream.avail_in = dat.size(); - - while (bStream.avail_in > 0) { - int ret = BZ2_bzDecompress(&bStream); - if (ret != BZ_OK && ret != BZ_STREAM_END) { - qWarning() << "bz2 decompress failed"; - break; - } + if (!exit_) { + emit finished(valid_); } - - int dled = raw.size() - bStream.avail_out; - cdled.put(dled); } - diff --git a/selfdrive/ui/replay/filereader.h b/selfdrive/ui/replay/filereader.h index d1d490184b..81c0a3e9fd 100644 --- a/selfdrive/ui/replay/filereader.h +++ b/selfdrive/ui/replay/filereader.h @@ -1,70 +1,87 @@ #pragma once -#include +#include +#include #include #include #include -#include #include -#include -#include +#include -#include #include -#include -#include "cereal/gen/cpp/log.capnp.h" -#include "tools/clib/channel.h" +#include "cereal/gen/cpp/log.capnp.h" class FileReader : public QObject { Q_OBJECT public: - FileReader(const QString& file_); - void startRequest(const QUrl &url); - ~FileReader(); - virtual void readyRead(); - void httpFinished(); - virtual void done() {}; + FileReader(const QString &fn, QObject *parent = nullptr); + void read(); + void abort(); -public slots: - void process(); - -protected: - QNetworkReply *reply; +signals: + void finished(const QByteArray &dat); + void failed(const QString &err); private: - QNetworkAccessManager *qnam; - QElapsedTimer timer; - QString file; + void startHttpRequest(); + QNetworkReply *reply_ = nullptr; + QUrl url_; }; -typedef QMultiMap Events; +enum CameraType { + RoadCam = 0, + DriverCam, + WideRoadCam +}; +const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; +const int MAX_CAMERAS = std::size(ALL_CAMERAS); -class LogReader : public FileReader { -Q_OBJECT +struct EncodeIdx { + int segmentNum; + uint32_t frameEncodeId; +}; + +class Event { public: - LogReader(const QString& file, Events *, QReadWriteLock* events_lock_, QMap > *eidx_); + Event(const kj::ArrayPtr &amsg) : reader(amsg) { + words = kj::ArrayPtr(amsg.begin(), reader.getEnd()); + event = reader.getRoot(); + which = event.which(); + mono_time = event.getLogMonoTime(); + } + inline kj::ArrayPtr bytes() const { return words.asBytes(); } + + uint64_t mono_time; + cereal::Event::Which which; + cereal::Event::Reader event; + capnp::FlatArrayMessageReader reader; + kj::ArrayPtr words; +}; + +class LogReader : public QObject { + Q_OBJECT + +public: + LogReader(const QString &file, QObject *parent = nullptr); ~LogReader(); + inline bool valid() const { return valid_; } - void readyRead(); - void done() { is_done = true; }; - bool is_done = false; + QMultiMap events; + std::unordered_map eidx[MAX_CAMERAS] = {}; -private: - bz_stream bStream; +signals: + void finished(bool success); - // backing store - QByteArray raw; +private: + void parseEvents(const QByteArray &dat); - std::thread *parser; - int event_offset; - channel cdled; + std::atomic exit_ = false; + std::atomic valid_ = false; + std::vector raw_; - // global - void mergeEvents(int dled); - Events *events; - QReadWriteLock* events_lock; - QMap > *eidx; + FileReader *file_reader_ = nullptr; + QThread thread_; }; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 95fad13522..cf3944cc45 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -69,14 +69,11 @@ void Replay::addSegment(int n) { return; } - QThread *t = new QThread; - lrs.insert(n, new LogReader(log_paths.at(n).toString(), &events, &events_lock, &eidx)); + lrs[n] = new LogReader(log_paths.at(n).toString()); + // this is a queued connection, mergeEvents is executed in the main thread. + QObject::connect(lrs[n], &LogReader::finished, this, &Replay::mergeEvents); - lrs[n]->moveToThread(t); - QObject::connect(t, &QThread::started, lrs[n], &LogReader::process); - t->start(); - - frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString()), this); + frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString())); } void Replay::removeSegment(int n) { @@ -104,6 +101,14 @@ void Replay::removeSegment(int n) { } } +void Replay::mergeEvents() { + LogReader *log = qobject_cast(sender()); + events += log->events; + for (CameraType cam_type : ALL_CAMERAS) { + eidx[cam_type].merge(log->eidx[cam_type]); + } +} + void Replay::start(){ thread = new QThread; QObject::connect(thread, &QThread::started, [=](){ @@ -212,7 +217,7 @@ void Replay::stream() { uint64_t t0r = timer.nsecsElapsed(); while ((eit != events.end()) && seek_ts < 0) { - cereal::Event::Reader e = (*eit); + cereal::Event::Reader e = (*eit)->event; std::string type; KJ_IF_MAYBE(e_, static_cast(e).which()) { type = e_->getProto().getName(); @@ -242,11 +247,11 @@ void Replay::stream() { if (type == "roadCameraState") { auto fr = e.getRoadCameraState(); - auto it_ = eidx.find(fr.getFrameId()); - if (it_ != eidx.end()) { - auto pp = *it_; - if (frs.find(pp.first) != frs.end()) { - auto frm = frs[pp.first]; + auto it_ = eidx[RoadCam].find(fr.getFrameId()); + if (it_ != eidx[RoadCam].end()) { + EncodeIdx &e = it_->second; + if (frs.find(e.segmentNum) != frs.end()) { + auto frm = frs[e.segmentNum]; if (vipc_server == nullptr) { cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); @@ -257,7 +262,7 @@ void Replay::stream() { vipc_server->start_listener(); } - uint8_t *dat = frm->get(pp.second); + uint8_t *dat = frm->get(e.frameEncodeId); if (dat) { VisionIpcBufExtra extra = {}; VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); @@ -270,11 +275,8 @@ void Replay::stream() { // publish msg if (sm == nullptr) { - capnp::MallocMessageBuilder msg; - msg.setRoot(e); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - pm->send(type.c_str(), (unsigned char*)bytes.begin(), bytes.size()); + auto bytes = (*eit)->bytes(); + pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size()); } else { std::vector> messages; messages.push_back({type, e}); diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 11d634108d..a6c9cfbb76 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -35,6 +36,7 @@ public slots: void keyboardThread(); void segmentQueueThread(); void parseResponse(const QString &response); + void mergeEvents(); private: float last_print = 0; @@ -48,9 +50,9 @@ private: QThread *queue_thread; // logs - Events events; + QMultiMap events; QReadWriteLock events_lock; - QMap> eidx; + std::unordered_map eidx[MAX_CAMERAS]; HttpRequest *http; QJsonArray camera_paths;