Refactor C++ LogReader (#21152)

* refactor logreader

* fix include order

* apply review

* remove typedefs

* remove include timing.h

* no parent

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 8c202d1bb4
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent e07f1a1ee4
commit d852c98d52
  1. 207
      selfdrive/ui/replay/filereader.cc
  2. 99
      selfdrive/ui/replay/filereader.h
  3. 40
      selfdrive/ui/replay/replay.cc
  4. 6
      selfdrive/ui/replay/replay.h

@ -1,139 +1,132 @@
#include "selfdrive/ui/replay/filereader.h" #include "selfdrive/ui/replay/filereader.h"
#include <bzlib.h>
#include <QtNetwork> #include <QtNetwork>
FileReader::FileReader(const QString& file_) : file(file_) { static bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size_t srcSize,
} size_t outputSizeIncrement = 0x100000U) {
bz_stream strm = {};
int ret = BZ2_bzDecompressInit(&strm, 0, 0);
assert(ret == BZ_OK);
strm.next_in = const_cast<char *>(srcData);
strm.avail_in = srcSize;
do {
strm.next_out = (char *)&dest[strm.total_out_lo32];
strm.avail_out = dest.size() - strm.total_out_lo32;
ret = BZ2_bzDecompress(&strm);
if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
dest.resize(dest.size() + outputSizeIncrement);
}
} while (ret == BZ_OK);
void FileReader::process() { BZ2_bzDecompressEnd(&strm);
timer.start(); dest.resize(strm.total_out_lo32);
QString str = file.simplified(); return ret == BZ_STREAM_END;
str.replace(" ", "");
startRequest(QUrl(str));
} }
void FileReader::startRequest(const QUrl &url) { // class FileReader
qnam = new QNetworkAccessManager;
reply = qnam->get(QNetworkRequest(url));
connect(reply, &QNetworkReply::finished, this, &FileReader::httpFinished);
connect(reply, &QIODevice::readyRead, this, &FileReader::readyRead);
qDebug() << "requesting" << url;
}
void FileReader::httpFinished() { FileReader::FileReader(const QString &fn, QObject *parent) : url_(fn), QObject(parent) {}
if (reply->error()) {
qWarning() << reply->errorString();
}
const QVariant redirectionTarget = reply->attribute(QNetworkRequest::RedirectionTargetAttribute); void FileReader::read() {
if (!redirectionTarget.isNull()) { if (url_.isLocalFile()) {
const QUrl redirectedUrl = redirectionTarget.toUrl(); QFile file(url_.toLocalFile());
//qDebug() << "redirected to" << redirectedUrl; if (file.open(QIODevice::ReadOnly)) {
startRequest(redirectedUrl); emit finished(file.readAll());
} else {
emit failed(QString("Failed to read file %1").arg(url_.toString()));
}
} else { } else {
qDebug() << "done in" << timer.elapsed() << "ms"; startHttpRequest();
done();
} }
} }
void FileReader::readyRead() { void FileReader::startHttpRequest() {
QByteArray dat = reply->readAll(); QNetworkAccessManager *qnam = new QNetworkAccessManager(this);
printf("got http ready read: %d\n", dat.size()); QNetworkRequest request(url_);
request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true);
reply_ = qnam->get(request);
connect(reply_, &QNetworkReply::finished, [=]() {
if (!reply_->error()) {
emit finished(reply_->readAll());
} else {
emit failed(reply_->errorString());
}
reply_->deleteLater();
reply_ = nullptr;
});
} }
FileReader::~FileReader() { void FileReader::abort() {
if (reply_) reply_->abort();
} }
LogReader::LogReader(const QString& file, Events *events_, QReadWriteLock* events_lock_, QMap<int, QPair<int, int> > *eidx_) : // class LogReader
FileReader(file), events(events_), events_lock(events_lock_), eidx(eidx_) {
bStream.next_in = NULL;
bStream.avail_in = 0;
bStream.bzalloc = NULL;
bStream.bzfree = NULL;
bStream.opaque = NULL;
int ret = BZ2_bzDecompressInit(&bStream, 0, 0); LogReader::LogReader(const QString &file, QObject *parent) : QObject(parent) {
if (ret != BZ_OK) qWarning() << "bz2 init failed"; file_reader_ = new FileReader(file);
file_reader_->moveToThread(&thread_);
// start with 64MB buffer connect(&thread_, &QThread::started, file_reader_, &FileReader::read);
raw.resize(1024*1024*64); connect(&thread_, &QThread::finished, file_reader_, &FileReader::deleteLater);
connect(file_reader_, &FileReader::finished, [=](const QByteArray &dat) {
// auto increment? parseEvents(dat);
bStream.next_out = raw.data(); });
bStream.avail_out = raw.size(); connect(file_reader_, &FileReader::failed, [=](const QString &err) {
qDebug() << err;
// parsed no events yet
event_offset = 0;
parser = new std::thread([&]() {
while (true) {
mergeEvents(cdled.get());
}
}); });
thread_.start();
} }
LogReader::~LogReader() { LogReader::~LogReader() {
delete parser; // wait until thread is finished.
exit_ = true;
file_reader_->abort();
thread_.quit();
thread_.wait();
// clear events
for (auto e : events) {
delete e;
}
} }
void LogReader::mergeEvents(int dled) { void LogReader::parseEvents(const QByteArray &dat) {
auto amsg = kj::arrayPtr((const capnp::word*)(raw.data() + event_offset), (dled-event_offset)/sizeof(capnp::word)); raw_.resize(1024 * 1024 * 64);
Events events_local; if (!decompressBZ2(raw_, dat.data(), dat.size())) {
QMap<int, QPair<int, int> > eidx_local; qWarning() << "bz2 decompress failed";
}
while (amsg.size() > 0) {
try {
capnp::FlatArrayMessageReader cmsg = capnp::FlatArrayMessageReader(amsg);
// this needed? it is
capnp::FlatArrayMessageReader *tmsg =
new capnp::FlatArrayMessageReader(kj::arrayPtr(amsg.begin(), cmsg.getEnd()));
amsg = kj::arrayPtr(cmsg.getEnd(), amsg.end());
cereal::Event::Reader event = tmsg->getRoot<cereal::Event>(); auto insertEidx = [&](CameraType type, const cereal::EncodeIndex::Reader &e) {
events_local.insert(event.getLogMonoTime(), event); eidx[type][e.getFrameId()] = {e.getSegmentNum(), e.getSegmentId()};
};
// hack valid_ = true;
// TODO: rewrite with callback kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
if (event.which() == cereal::Event::ROAD_ENCODE_IDX) { while (!exit_ && words.size() > 0) {
auto ee = event.getRoadEncodeIdx(); try {
eidx_local.insert(ee.getFrameId(), qMakePair(ee.getSegmentNum(), ee.getSegmentId())); std::unique_ptr<Event> evt = std::make_unique<Event>(words);
switch (evt->which) {
case cereal::Event::ROAD_ENCODE_IDX:
insertEidx(RoadCam, evt->event.getRoadEncodeIdx());
break;
case cereal::Event::DRIVER_ENCODE_IDX:
insertEidx(DriverCam, evt->event.getDriverEncodeIdx());
break;
case cereal::Event::WIDE_ROAD_ENCODE_IDX:
insertEidx(WideRoadCam, evt->event.getWideRoadEncodeIdx());
break;
default:
break;
} }
words = kj::arrayPtr(evt->reader.getEnd(), words.end());
// increment events.insert(evt->mono_time, evt.release());
event_offset = (char*)cmsg.getEnd() - raw.data(); } catch (const kj::Exception &e) {
} catch (const kj::Exception& e) { valid_ = false;
// partial messages trigger this
//qDebug() << e.getDescription().cStr();
break; break;
} }
} }
if (!exit_) {
// merge in events emit finished(valid_);
// TODO: add lock
events_lock->lockForWrite();
*events += events_local;
eidx->unite(eidx_local);
events_lock->unlock();
}
void LogReader::readyRead() {
QByteArray dat = reply->readAll();
bStream.next_in = dat.data();
bStream.avail_in = dat.size();
while (bStream.avail_in > 0) {
int ret = BZ2_bzDecompress(&bStream);
if (ret != BZ_OK && ret != BZ_STREAM_END) {
qWarning() << "bz2 decompress failed";
break;
}
} }
int dled = raw.size() - bStream.avail_out;
cdled.put(dled);
} }

@ -1,70 +1,87 @@
#pragma once #pragma once
#include <thread> #include <unordered_map>
#include <vector>
#include <QElapsedTimer> #include <QElapsedTimer>
#include <QMultiMap> #include <QMultiMap>
#include <QNetworkAccessManager> #include <QNetworkAccessManager>
#include <QReadWriteLock>
#include <QString> #include <QString>
#include <QVector> #include <QThread>
#include <QWidget>
#include <bzlib.h>
#include <capnp/serialize.h> #include <capnp/serialize.h>
#include <kj/io.h>
#include "cereal/gen/cpp/log.capnp.h"
#include "tools/clib/channel.h" #include "cereal/gen/cpp/log.capnp.h"
class FileReader : public QObject { class FileReader : public QObject {
Q_OBJECT Q_OBJECT
public: public:
FileReader(const QString& file_); FileReader(const QString &fn, QObject *parent = nullptr);
void startRequest(const QUrl &url); void read();
~FileReader(); void abort();
virtual void readyRead();
void httpFinished();
virtual void done() {};
public slots: signals:
void process(); void finished(const QByteArray &dat);
void failed(const QString &err);
protected:
QNetworkReply *reply;
private: private:
QNetworkAccessManager *qnam; void startHttpRequest();
QElapsedTimer timer; QNetworkReply *reply_ = nullptr;
QString file; QUrl url_;
}; };
typedef QMultiMap<uint64_t, cereal::Event::Reader> Events; enum CameraType {
RoadCam = 0,
DriverCam,
WideRoadCam
};
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
class LogReader : public FileReader { struct EncodeIdx {
Q_OBJECT int segmentNum;
uint32_t frameEncodeId;
};
class Event {
public: public:
LogReader(const QString& file, Events *, QReadWriteLock* events_lock_, QMap<int, QPair<int, int> > *eidx_); Event(const kj::ArrayPtr<const capnp::word> &amsg) : reader(amsg) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>();
which = event.which();
mono_time = event.getLogMonoTime();
}
inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); }
uint64_t mono_time;
cereal::Event::Which which;
cereal::Event::Reader event;
capnp::FlatArrayMessageReader reader;
kj::ArrayPtr<const capnp::word> words;
};
class LogReader : public QObject {
Q_OBJECT
public:
LogReader(const QString &file, QObject *parent = nullptr);
~LogReader(); ~LogReader();
inline bool valid() const { return valid_; }
void readyRead(); QMultiMap<uint64_t, Event*> events;
void done() { is_done = true; }; std::unordered_map<uint32_t, EncodeIdx> eidx[MAX_CAMERAS] = {};
bool is_done = false;
private: signals:
bz_stream bStream; void finished(bool success);
// backing store private:
QByteArray raw; void parseEvents(const QByteArray &dat);
std::thread *parser; std::atomic<bool> exit_ = false;
int event_offset; std::atomic<bool> valid_ = false;
channel<int> cdled; std::vector<uint8_t> raw_;
// global FileReader *file_reader_ = nullptr;
void mergeEvents(int dled); QThread thread_;
Events *events;
QReadWriteLock* events_lock;
QMap<int, QPair<int, int> > *eidx;
}; };

@ -69,14 +69,11 @@ void Replay::addSegment(int n) {
return; return;
} }
QThread *t = new QThread; lrs[n] = new LogReader(log_paths.at(n).toString());
lrs.insert(n, new LogReader(log_paths.at(n).toString(), &events, &events_lock, &eidx)); // this is a queued connection, mergeEvents is executed in the main thread.
QObject::connect(lrs[n], &LogReader::finished, this, &Replay::mergeEvents);
lrs[n]->moveToThread(t); frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString()));
QObject::connect(t, &QThread::started, lrs[n], &LogReader::process);
t->start();
frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString()), this);
} }
void Replay::removeSegment(int n) { void Replay::removeSegment(int n) {
@ -104,6 +101,14 @@ void Replay::removeSegment(int n) {
} }
} }
void Replay::mergeEvents() {
LogReader *log = qobject_cast<LogReader *>(sender());
events += log->events;
for (CameraType cam_type : ALL_CAMERAS) {
eidx[cam_type].merge(log->eidx[cam_type]);
}
}
void Replay::start(){ void Replay::start(){
thread = new QThread; thread = new QThread;
QObject::connect(thread, &QThread::started, [=](){ QObject::connect(thread, &QThread::started, [=](){
@ -212,7 +217,7 @@ void Replay::stream() {
uint64_t t0r = timer.nsecsElapsed(); uint64_t t0r = timer.nsecsElapsed();
while ((eit != events.end()) && seek_ts < 0) { while ((eit != events.end()) && seek_ts < 0) {
cereal::Event::Reader e = (*eit); cereal::Event::Reader e = (*eit)->event;
std::string type; std::string type;
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) { KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(e).which()) {
type = e_->getProto().getName(); type = e_->getProto().getName();
@ -242,11 +247,11 @@ void Replay::stream() {
if (type == "roadCameraState") { if (type == "roadCameraState") {
auto fr = e.getRoadCameraState(); auto fr = e.getRoadCameraState();
auto it_ = eidx.find(fr.getFrameId()); auto it_ = eidx[RoadCam].find(fr.getFrameId());
if (it_ != eidx.end()) { if (it_ != eidx[RoadCam].end()) {
auto pp = *it_; EncodeIdx &e = it_->second;
if (frs.find(pp.first) != frs.end()) { if (frs.find(e.segmentNum) != frs.end()) {
auto frm = frs[pp.first]; auto frm = frs[e.segmentNum];
if (vipc_server == nullptr) { if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
@ -257,7 +262,7 @@ void Replay::stream() {
vipc_server->start_listener(); vipc_server->start_listener();
} }
uint8_t *dat = frm->get(pp.second); uint8_t *dat = frm->get(e.frameEncodeId);
if (dat) { if (dat) {
VisionIpcBufExtra extra = {}; VisionIpcBufExtra extra = {};
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
@ -270,11 +275,8 @@ void Replay::stream() {
// publish msg // publish msg
if (sm == nullptr) { if (sm == nullptr) {
capnp::MallocMessageBuilder msg; auto bytes = (*eit)->bytes();
msg.setRoot(e); pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size());
auto words = capnp::messageToFlatArray(msg);
auto bytes = words.asBytes();
pm->send(type.c_str(), (unsigned char*)bytes.begin(), bytes.size());
} else { } else {
std::vector<std::pair<std::string, cereal::Event::Reader>> messages; std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
messages.push_back({type, e}); messages.push_back({type, e});

@ -4,6 +4,7 @@
#include <termios.h> #include <termios.h>
#include <QJsonArray> #include <QJsonArray>
#include <QReadWriteLock>
#include <QThread> #include <QThread>
#include <capnp/dynamic.h> #include <capnp/dynamic.h>
@ -35,6 +36,7 @@ public slots:
void keyboardThread(); void keyboardThread();
void segmentQueueThread(); void segmentQueueThread();
void parseResponse(const QString &response); void parseResponse(const QString &response);
void mergeEvents();
private: private:
float last_print = 0; float last_print = 0;
@ -48,9 +50,9 @@ private:
QThread *queue_thread; QThread *queue_thread;
// logs // logs
Events events; QMultiMap<uint64_t, Event*> events;
QReadWriteLock events_lock; QReadWriteLock events_lock;
QMap<int, QPair<int, int>> eidx; std::unordered_map<uint32_t, EncodeIdx> eidx[MAX_CAMERAS];
HttpRequest *http; HttpRequest *http;
QJsonArray camera_paths; QJsonArray camera_paths;

Loading…
Cancel
Save