replay: refactor `Event` to remove the readers (#32252)

Refactor struct Event to remove the MessageReader from it
pull/32155/head
Dean Lee 1 year ago committed by GitHub
parent 60c71580da
commit a824bd75ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      tools/cabana/streams/replaystream.cc
  2. 6
      tools/cabana/videowidget.cc
  3. 2
      tools/replay/SConscript
  4. 14
      tools/replay/camera.cc
  5. 6
      tools/replay/camera.h
  6. 68
      tools/replay/logreader.cc
  7. 19
      tools/replay/logreader.h
  8. 36
      tools/replay/replay.cc
  9. 2
      tools/replay/tests/test_replay.cc
  10. 1
      tools/replay/util.cc

@ -36,7 +36,9 @@ void ReplayStream::mergeSegments() {
for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) { for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) {
if ((*it)->which == cereal::Event::Which::CAN) { if ((*it)->which == cereal::Event::Which::CAN) {
const uint64_t ts = (*it)->mono_time; const uint64_t ts = (*it)->mono_time;
for (const auto &c : (*it)->event.getCan()) { capnp::FlatArrayMessageReader reader((*it)->data);
auto event = reader.getRoot<cereal::Event>();
for (const auto &c : event.getCan()) {
new_events.push_back(newEvent(ts, c)); new_events.push_back(newEvent(ts, c));
} }
} }
@ -66,7 +68,9 @@ bool ReplayStream::eventFilter(const Event *event) {
static double prev_update_ts = 0; static double prev_update_ts = 0;
if (event->which == cereal::Event::Which::CAN) { if (event->which == cereal::Event::Which::CAN) {
double current_sec = event->mono_time / 1e9 - routeStartTime(); double current_sec = event->mono_time / 1e9 - routeStartTime();
for (const auto &c : event->event.getCan()) { capnp::FlatArrayMessageReader reader(event->data);
auto e = reader.getRoot<cereal::Event>();
for (const auto &c : e.getCan()) {
MessageId id = {.source = c.getSrc(), .address = c.getAddress()}; MessageId id = {.source = c.getSrc(), .address = c.getAddress()};
const auto dat = c.getDat(); const auto dat = c.getDat();
updateEvent(id, current_sec, (const uint8_t*)dat.begin(), dat.size()); updateEvent(id, current_sec, (const uint8_t*)dat.begin(), dat.size());

@ -263,7 +263,8 @@ void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
std::mutex mutex; std::mutex mutex;
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) { QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) {
if (e->which == cereal::Event::Which::THUMBNAIL) { if (e->which == cereal::Event::Which::THUMBNAIL) {
auto thumb = e->event.getThumbnail(); capnp::FlatArrayMessageReader reader(e->data);
auto thumb = reader.getRoot<cereal::Event>().getThumbnail();
auto data = thumb.getThumbnail(); auto data = thumb.getThumbnail();
if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) { if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) {
QPixmap scaled = pm.scaledToHeight(MIN_VIDEO_HEIGHT - THUMBNAIL_MARGIN * 2, Qt::SmoothTransformation); QPixmap scaled = pm.scaledToHeight(MIN_VIDEO_HEIGHT - THUMBNAIL_MARGIN * 2, Qt::SmoothTransformation);
@ -271,7 +272,8 @@ void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
thumbnails[thumb.getTimestampEof()] = scaled; thumbnails[thumb.getTimestampEof()] = scaled;
} }
} else if (e->which == cereal::Event::Which::CONTROLS_STATE) { } else if (e->which == cereal::Event::Which::CONTROLS_STATE) {
auto cs = e->event.getControlsState(); capnp::FlatArrayMessageReader reader(e->data);
auto cs = reader.getRoot<cereal::Event>().getControlsState();
if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 && if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 &&
cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) { cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) {
std::lock_guard lk(mutex); std::lock_guard lk(mutex);

@ -9,8 +9,6 @@ if arch == "Darwin":
else: else:
base_libs.append('OpenCL') base_libs.append('OpenCL')
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"] replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib') Export('replay_lib')

@ -1,7 +1,8 @@
#include "tools/replay/camera.h" #include "tools/replay/camera.h"
#include <capnp/dynamic.h>
#include <cassert> #include <cassert>
#include <tuple>
#include "third_party/linux/include/msm_media_info.h" #include "third_party/linux/include/msm_media_info.h"
#include "tools/replay/util.h" #include "tools/replay/util.h"
@ -57,9 +58,14 @@ void CameraServer::cameraThread(Camera &cam) {
}; };
while (true) { while (true) {
const auto [fr, eidx] = cam.queue.pop(); const auto [fr, event] = cam.queue.pop();
if (!fr) break; if (!fr) break;
capnp::FlatArrayMessageReader reader(event->data);
auto evt = reader.getRoot<cereal::Event>();
auto eidx = capnp::AnyStruct::Reader(evt).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (eidx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C) continue;
const int id = eidx.getSegmentId(); const int id = eidx.getSegmentId();
bool prefetched = (id == cam.cached_id && eidx.getSegmentNum() == cam.cached_seg); bool prefetched = (id == cam.cached_id && eidx.getSegmentNum() == cam.cached_seg);
auto yuv = prefetched ? cam.cached_buf : read_frame(fr, id); auto yuv = prefetched ? cam.cached_buf : read_frame(fr, id);
@ -83,7 +89,7 @@ void CameraServer::cameraThread(Camera &cam) {
} }
} }
void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::EncodeIndex::Reader &eidx) { void CameraServer::pushFrame(CameraType type, FrameReader *fr, const Event *event) {
auto &cam = cameras_[type]; auto &cam = cameras_[type];
if (cam.width != fr->width || cam.height != fr->height) { if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width; cam.width = fr->width;
@ -93,7 +99,7 @@ void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::Enc
} }
++publishing_; ++publishing_;
cam.queue.push({fr, eidx}); cam.queue.push({fr, event});
} }
void CameraServer::waitForSent() { void CameraServer::waitForSent() {

@ -1,7 +1,5 @@
#pragma once #pragma once
#include <unistd.h>
#include <memory> #include <memory>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
@ -17,7 +15,7 @@ class CameraServer {
public: public:
CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS] = nullptr); CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS] = nullptr);
~CameraServer(); ~CameraServer();
void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx); void pushFrame(CameraType type, FrameReader* fr, const Event *event);
void waitForSent(); void waitForSent();
protected: protected:
@ -27,7 +25,7 @@ protected:
int width; int width;
int height; int height;
std::thread thread; std::thread thread;
SafeQueue<std::pair<FrameReader*, const cereal::EncodeIndex::Reader>> queue; SafeQueue<std::pair<FrameReader*, const Event *>> queue;
int cached_id = -1; int cached_id = -1;
int cached_seg = -1; int cached_seg = -1;
VisionBuf * cached_buf; VisionBuf * cached_buf;

@ -4,34 +4,7 @@
#include "tools/replay/filereader.h" #include "tools/replay/filereader.h"
#include "tools/replay/util.h" #include "tools/replay/util.h"
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>();
which = event.which();
mono_time = event.getLogMonoTime();
// 1) Send video data at t=timestampEof/timestampSof
// 2) Send encodeIndex packet at t=logMonoTime
if (frame) {
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
// C2 only has eof set, and some older routes have neither
uint64_t sof = idx.getTimestampSof();
uint64_t eof = idx.getTimestampEof();
if (sof > 0) {
mono_time = sof;
} else if (eof > 0) {
mono_time = eof;
}
}
}
// class LogReader
LogReader::LogReader(size_t memory_pool_block_size) { LogReader::LogReader(size_t memory_pool_block_size) {
#ifdef HAS_MEMORY_RESOURCE
const size_t buf_size = sizeof(Event) * memory_pool_block_size;
mbr_ = std::make_unique<std::pmr::monotonic_buffer_resource>(buf_size);
#endif
events.reserve(memory_pool_block_size); events.reserve(memory_pool_block_size);
} }
@ -61,33 +34,28 @@ bool LogReader::parse(std::atomic<bool> *abort) {
try { try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0 && !(abort && *abort)) { while (words.size() > 0 && !(abort && *abort)) {
#ifdef HAS_MEMORY_RESOURCE capnp::FlatArrayMessageReader reader(words);
Event *evt = new (mbr_.get()) Event(words); auto event = reader.getRoot<cereal::Event>();
#else auto which = event.which();
Event *evt = new Event(words); uint64_t mono_time = event.getLogMonoTime();
#endif auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
Event *evt = events.emplace_back(newEvent(which, mono_time, event_data));
// Add encodeIdx packet again as a frame packet for the video stream // Add encodeIdx packet again as a frame packet for the video stream
if (evt->which == cereal::Event::ROAD_ENCODE_IDX || if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX || evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
#ifdef HAS_MEMORY_RESOURCE if (uint64_t sof = idx.getTimestampSof()) {
Event *frame_evt = new (mbr_.get()) Event(words, true); mono_time = sof;
#else }
Event *frame_evt = new Event(words, true); events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum()));
#endif
events.push_back(frame_evt);
} }
words = kj::arrayPtr(evt->reader.getEnd(), words.end()); words = kj::arrayPtr(reader.getEnd(), words.end());
events.push_back(evt);
} }
} catch (const kj::Exception &e) { } catch (const kj::Exception &e) {
rWarning("failed to parse log : %s", e.getDescription().cStr()); rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size());
if (!events.empty()) {
rWarning("read %zu events from corrupt log", events.size());
}
} }
if (!events.empty() && !(abort && *abort)) { if (!events.empty() && !(abort && *abort)) {
@ -96,3 +64,11 @@ bool LogReader::parse(std::atomic<bool> *abort) {
} }
return false; return false;
} }
Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum) {
#ifdef HAS_MEMORY_RESOURCE
return new (&mbr_) Event(which, mono_time, words, eidx_segnum);
#else
return new Event(which, mono_time, words, eidx_segnum);
#endif
}

@ -4,7 +4,6 @@
#define HAS_MEMORY_RESOURCE 1 #define HAS_MEMORY_RESOURCE 1
#include <memory_resource> #include <memory_resource>
#endif #endif
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -18,13 +17,8 @@ const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000;
class Event { class Event {
public: public:
Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr<capnp::word>{}) { Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &data, int eidx_segnum = -1)
// construct a dummy Event for binary search, e.g std::upper_bound : which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {}
this->which = which;
this->mono_time = mono_time;
}
Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame = false);
inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); }
struct lessThan { struct lessThan {
inline bool operator()(const Event *l, const Event *r) { inline bool operator()(const Event *l, const Event *r) {
@ -43,10 +37,8 @@ public:
uint64_t mono_time; uint64_t mono_time;
cereal::Event::Which which; cereal::Event::Which which;
cereal::Event::Reader event; kj::ArrayPtr<const capnp::word> data;
capnp::FlatArrayMessageReader reader; int32_t eidx_segnum;
kj::ArrayPtr<const capnp::word> words;
bool frame;
}; };
class LogReader { class LogReader {
@ -59,9 +51,10 @@ public:
std::vector<Event*> events; std::vector<Event*> events;
private: private:
Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum = -1);
bool parse(std::atomic<bool> *abort); bool parse(std::atomic<bool> *abort);
std::string raw_; std::string raw_;
#ifdef HAS_MEMORY_RESOURCE #ifdef HAS_MEMORY_RESOURCE
std::unique_ptr<std::pmr::monotonic_buffer_resource> mbr_; std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)};
#endif #endif
}; };

@ -148,7 +148,9 @@ void Replay::buildTimeline() {
for (const Event *e : log->events) { for (const Event *e : log->events) {
if (e->which == cereal::Event::Which::CONTROLS_STATE) { if (e->which == cereal::Event::Which::CONTROLS_STATE) {
auto cs = e->event.getControlsState(); capnp::FlatArrayMessageReader reader(e->data);
auto event = reader.getRoot<cereal::Event>();
auto cs = event.getControlsState();
if (engaged != cs.getEnabled()) { if (engaged != cs.getEnabled()) {
if (engaged) { if (engaged) {
@ -232,6 +234,7 @@ void Replay::queueSegment() {
auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end()))); auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit, std::distance(segments_.begin(), end)));
// load one segment at a time // load one segment at a time
auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); }); auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); });
if (it != end && !it->second) { if (it != end && !it->second) {
@ -316,7 +319,9 @@ void Replay::startStream(const Segment *cur_segment) {
auto it = std::find_if(events.cbegin(), events.cend(), auto it = std::find_if(events.cbegin(), events.cend(),
[](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != events.cend()) { if (it != events.cend()) {
uint64_t wall_time = (*it)->event.getInitData().getWallTimeNanos(); capnp::FlatArrayMessageReader reader((*it)->data);
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) { if (wall_time > 0) {
route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6); route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6);
} }
@ -325,9 +330,11 @@ void Replay::startStream(const Segment *cur_segment) {
// write CarParams // write CarParams
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) { if (it != events.end()) {
car_fingerprint_ = (*it)->event.getCarParams().getCarFingerprint(); capnp::FlatArrayMessageReader reader((*it)->data);
auto event = reader.getRoot<cereal::Event>();
car_fingerprint_ = event.getCarParams().getCarFingerprint();
capnp::MallocMessageBuilder builder; capnp::MallocMessageBuilder builder;
builder.setRoot((*it)->event.getCarParams()); builder.setRoot(event.getCarParams());
auto words = capnp::messageToFlatArray(builder); auto words = capnp::messageToFlatArray(builder);
auto bytes = words.asBytes(); auto bytes = words.asBytes();
Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
@ -361,14 +368,16 @@ void Replay::publishMessage(const Event *e) {
if (event_filter && event_filter(e, filter_opaque)) return; if (event_filter && event_filter(e, filter_opaque)) return;
if (sm == nullptr) { if (sm == nullptr) {
auto bytes = e->bytes(); auto bytes = e->data.asBytes();
int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) { if (ret == -1) {
rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
sockets_[e->which] = nullptr; sockets_[e->which] = nullptr;
} }
} else { } else {
sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}}); capnp::FlatArrayMessageReader reader(e->data);
auto event = reader.getRoot<cereal::Event>();
sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
} }
} }
@ -382,10 +391,13 @@ void Replay::publishFrame(const Event *e) {
(e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) { (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) {
return; return;
} }
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentMerged(eidx.getSegmentNum())) { if (isSegmentMerged(e->eidx_segnum)) {
CameraType cam = cam_types.at(e->which); auto &segment = segments_.at(e->eidx_segnum);
camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); auto cam = cam_types.at(e->which);
if (auto &frame = segment->frames[cam]; frame) {
camera_server_->pushFrame(cam, frame.get(), e);
}
} }
} }
@ -399,7 +411,7 @@ void Replay::stream() {
events_updated_ = false; events_updated_ = false;
if (exit_) break; if (exit_) break;
Event cur_event(cur_which, cur_mono_time_); Event cur_event{cur_which, cur_mono_time_, {}};
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) { if (eit == events_->end()) {
rInfo("waiting for events..."); rInfo("waiting for events...");
@ -430,7 +442,7 @@ void Replay::stream() {
precise_nano_sleep(behind_ns); precise_nano_sleep(behind_ns);
} }
if (!evt->frame) { if (evt->eidx_segnum == -1) {
publishMessage(evt); publishMessage(evt);
} else if (camera_server_) { } else if (camera_server_) {
if (speed_ > 1.0) { if (speed_ > 1.0) {

@ -178,7 +178,7 @@ void TestReplay::testSeekTo(int seek_to) {
continue; continue;
} }
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_); Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_, {});
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) { if (eit == events_->end()) {
qDebug() << "waiting for events..."; qDebug() << "waiting for events...";

@ -298,6 +298,7 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
BZ2_bzDecompressEnd(&strm); BZ2_bzDecompressEnd(&strm);
if (bzerror == BZ_STREAM_END && !(abort && *abort)) { if (bzerror == BZ_STREAM_END && !(abort && *abort)) {
out.resize(strm.total_out_lo32); out.resize(strm.total_out_lo32);
out.shrink_to_fit();
return out; return out;
} }
return {}; return {};

Loading…
Cancel
Save