From a824bd75ef38d3e40341151dd8bf5fb1eac02f3c Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sat, 20 Apr 2024 02:15:34 +0800 Subject: [PATCH] replay: refactor `Event` to remove the readers (#32252) Refactor struct Event to remove the MessageReader from it --- tools/cabana/streams/replaystream.cc | 8 +++- tools/cabana/videowidget.cc | 6 ++- tools/replay/SConscript | 2 - tools/replay/camera.cc | 14 ++++-- tools/replay/camera.h | 6 +-- tools/replay/logreader.cc | 68 +++++++++------------------- tools/replay/logreader.h | 19 +++----- tools/replay/replay.cc | 36 ++++++++++----- tools/replay/tests/test_replay.cc | 2 +- tools/replay/util.cc | 1 + 10 files changed, 76 insertions(+), 86 deletions(-) diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index aff0122b47..3fa8bb0fe9 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -36,7 +36,9 @@ void ReplayStream::mergeSegments() { for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) { if ((*it)->which == cereal::Event::Which::CAN) { const uint64_t ts = (*it)->mono_time; - for (const auto &c : (*it)->event.getCan()) { + capnp::FlatArrayMessageReader reader((*it)->data); + auto event = reader.getRoot(); + for (const auto &c : event.getCan()) { new_events.push_back(newEvent(ts, c)); } } @@ -66,7 +68,9 @@ bool ReplayStream::eventFilter(const Event *event) { static double prev_update_ts = 0; if (event->which == cereal::Event::Which::CAN) { double current_sec = event->mono_time / 1e9 - routeStartTime(); - for (const auto &c : event->event.getCan()) { + capnp::FlatArrayMessageReader reader(event->data); + auto e = reader.getRoot(); + for (const auto &c : e.getCan()) { MessageId id = {.source = c.getSrc(), .address = c.getAddress()}; const auto dat = c.getDat(); updateEvent(id, current_sec, (const uint8_t*)dat.begin(), dat.size()); diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index a6fd0b2b64..ad20543755 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -263,7 +263,8 @@ void Slider::parseQLog(int segnum, std::shared_ptr qlog) { std::mutex mutex; QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) { if (e->which == cereal::Event::Which::THUMBNAIL) { - auto thumb = e->event.getThumbnail(); + capnp::FlatArrayMessageReader reader(e->data); + auto thumb = reader.getRoot().getThumbnail(); auto data = thumb.getThumbnail(); if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) { QPixmap scaled = pm.scaledToHeight(MIN_VIDEO_HEIGHT - THUMBNAIL_MARGIN * 2, Qt::SmoothTransformation); @@ -271,7 +272,8 @@ void Slider::parseQLog(int segnum, std::shared_ptr qlog) { thumbnails[thumb.getTimestampEof()] = scaled; } } else if (e->which == cereal::Event::Which::CONTROLS_STATE) { - auto cs = e->event.getControlsState(); + capnp::FlatArrayMessageReader reader(e->data); + auto cs = reader.getRoot().getControlsState(); if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 && cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) { std::lock_guard lk(mutex); diff --git a/tools/replay/SConscript b/tools/replay/SConscript index db8447003b..5d88f560be 100644 --- a/tools/replay/SConscript +++ b/tools/replay/SConscript @@ -9,8 +9,6 @@ if arch == "Darwin": else: base_libs.append('OpenCL') -qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) Export('replay_lib') diff --git a/tools/replay/camera.cc b/tools/replay/camera.cc index 49f3010c6c..9a023db6fa 100644 --- a/tools/replay/camera.cc +++ b/tools/replay/camera.cc @@ -1,7 +1,8 @@ #include "tools/replay/camera.h" +#include + #include -#include #include "third_party/linux/include/msm_media_info.h" #include "tools/replay/util.h" @@ -57,9 +58,14 @@ void CameraServer::cameraThread(Camera &cam) { }; while (true) { - const auto [fr, eidx] = cam.queue.pop(); + const auto [fr, event] = cam.queue.pop(); if (!fr) break; + capnp::FlatArrayMessageReader reader(event->data); + auto evt = reader.getRoot(); + auto eidx = capnp::AnyStruct::Reader(evt).getPointerSection()[0].getAs(); + if (eidx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C) continue; + const int id = eidx.getSegmentId(); bool prefetched = (id == cam.cached_id && eidx.getSegmentNum() == cam.cached_seg); auto yuv = prefetched ? cam.cached_buf : read_frame(fr, id); @@ -83,7 +89,7 @@ void CameraServer::cameraThread(Camera &cam) { } } -void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::EncodeIndex::Reader &eidx) { +void CameraServer::pushFrame(CameraType type, FrameReader *fr, const Event *event) { auto &cam = cameras_[type]; if (cam.width != fr->width || cam.height != fr->height) { cam.width = fr->width; @@ -93,7 +99,7 @@ void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::Enc } ++publishing_; - cam.queue.push({fr, eidx}); + cam.queue.push({fr, event}); } void CameraServer::waitForSent() { diff --git a/tools/replay/camera.h b/tools/replay/camera.h index 9f43c5a362..436423ac72 100644 --- a/tools/replay/camera.h +++ b/tools/replay/camera.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include #include @@ -17,7 +15,7 @@ class CameraServer { public: CameraServer(std::pair camera_size[MAX_CAMERAS] = nullptr); ~CameraServer(); - void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx); + void pushFrame(CameraType type, FrameReader* fr, const Event *event); void waitForSent(); protected: @@ -27,7 +25,7 @@ protected: int width; int height; std::thread thread; - SafeQueue> queue; + SafeQueue> queue; int cached_id = -1; int cached_seg = -1; VisionBuf * cached_buf; diff --git a/tools/replay/logreader.cc b/tools/replay/logreader.cc index c92ff4753f..36b07f19d0 100644 --- a/tools/replay/logreader.cc +++ b/tools/replay/logreader.cc @@ -4,34 +4,7 @@ #include "tools/replay/filereader.h" #include "tools/replay/util.h" -Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { - words = kj::ArrayPtr(amsg.begin(), reader.getEnd()); - event = reader.getRoot(); - which = event.which(); - mono_time = event.getLogMonoTime(); - - // 1) Send video data at t=timestampEof/timestampSof - // 2) Send encodeIndex packet at t=logMonoTime - if (frame) { - auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs(); - // C2 only has eof set, and some older routes have neither - uint64_t sof = idx.getTimestampSof(); - uint64_t eof = idx.getTimestampEof(); - if (sof > 0) { - mono_time = sof; - } else if (eof > 0) { - mono_time = eof; - } - } -} - -// class LogReader - LogReader::LogReader(size_t memory_pool_block_size) { -#ifdef HAS_MEMORY_RESOURCE - const size_t buf_size = sizeof(Event) * memory_pool_block_size; - mbr_ = std::make_unique(buf_size); -#endif events.reserve(memory_pool_block_size); } @@ -61,33 +34,28 @@ bool LogReader::parse(std::atomic *abort) { try { kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); while (words.size() > 0 && !(abort && *abort)) { -#ifdef HAS_MEMORY_RESOURCE - Event *evt = new (mbr_.get()) Event(words); -#else - Event *evt = new Event(words); -#endif + capnp::FlatArrayMessageReader reader(words); + auto event = reader.getRoot(); + auto which = event.which(); + uint64_t mono_time = event.getLogMonoTime(); + auto event_data = kj::arrayPtr(words.begin(), reader.getEnd()); + + Event *evt = events.emplace_back(newEvent(which, mono_time, event_data)); // Add encodeIdx packet again as a frame packet for the video stream if (evt->which == cereal::Event::ROAD_ENCODE_IDX || evt->which == cereal::Event::DRIVER_ENCODE_IDX || evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { - -#ifdef HAS_MEMORY_RESOURCE - Event *frame_evt = new (mbr_.get()) Event(words, true); -#else - Event *frame_evt = new Event(words, true); -#endif - - events.push_back(frame_evt); + auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs(); + if (uint64_t sof = idx.getTimestampSof()) { + mono_time = sof; + } + events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum())); } - words = kj::arrayPtr(evt->reader.getEnd(), words.end()); - events.push_back(evt); + words = kj::arrayPtr(reader.getEnd(), words.end()); } } catch (const kj::Exception &e) { - rWarning("failed to parse log : %s", e.getDescription().cStr()); - if (!events.empty()) { - rWarning("read %zu events from corrupt log", events.size()); - } + rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size()); } if (!events.empty() && !(abort && *abort)) { @@ -96,3 +64,11 @@ bool LogReader::parse(std::atomic *abort) { } return false; } + +Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &words, int eidx_segnum) { +#ifdef HAS_MEMORY_RESOURCE + return new (&mbr_) Event(which, mono_time, words, eidx_segnum); +#else + return new Event(which, mono_time, words, eidx_segnum); +#endif +} diff --git a/tools/replay/logreader.h b/tools/replay/logreader.h index 73f822d16c..2a28d7b432 100644 --- a/tools/replay/logreader.h +++ b/tools/replay/logreader.h @@ -4,7 +4,6 @@ #define HAS_MEMORY_RESOURCE 1 #include #endif - #include #include #include @@ -18,13 +17,8 @@ const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000; class Event { public: - Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr{}) { - // construct a dummy Event for binary search, e.g std::upper_bound - this->which = which; - this->mono_time = mono_time; - } - Event(const kj::ArrayPtr &amsg, bool frame = false); - inline kj::ArrayPtr bytes() const { return words.asBytes(); } + Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &data, int eidx_segnum = -1) + : which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {} struct lessThan { inline bool operator()(const Event *l, const Event *r) { @@ -43,10 +37,8 @@ public: uint64_t mono_time; cereal::Event::Which which; - cereal::Event::Reader event; - capnp::FlatArrayMessageReader reader; - kj::ArrayPtr words; - bool frame; + kj::ArrayPtr data; + int32_t eidx_segnum; }; class LogReader { @@ -59,9 +51,10 @@ public: std::vector events; private: + Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &words, int eidx_segnum = -1); bool parse(std::atomic *abort); std::string raw_; #ifdef HAS_MEMORY_RESOURCE - std::unique_ptr mbr_; + std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)}; #endif }; diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index ae148f1a5b..2e50722551 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -148,7 +148,9 @@ void Replay::buildTimeline() { for (const Event *e : log->events) { if (e->which == cereal::Event::Which::CONTROLS_STATE) { - auto cs = e->event.getControlsState(); + capnp::FlatArrayMessageReader reader(e->data); + auto event = reader.getRoot(); + auto cs = event.getControlsState(); if (engaged != cs.getEnabled()) { if (engaged) { @@ -232,6 +234,7 @@ void Replay::queueSegment() { auto begin = std::prev(cur, std::min(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); auto end = std::next(begin, std::min(segment_cache_limit, std::distance(begin, segments_.end()))); + begin = std::prev(end, std::min(segment_cache_limit, std::distance(segments_.begin(), end))); // load one segment at a time auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); }); if (it != end && !it->second) { @@ -316,7 +319,9 @@ void Replay::startStream(const Segment *cur_segment) { auto it = std::find_if(events.cbegin(), events.cend(), [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); if (it != events.cend()) { - uint64_t wall_time = (*it)->event.getInitData().getWallTimeNanos(); + capnp::FlatArrayMessageReader reader((*it)->data); + auto event = reader.getRoot(); + uint64_t wall_time = event.getInitData().getWallTimeNanos(); if (wall_time > 0) { route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6); } @@ -325,9 +330,11 @@ void Replay::startStream(const Segment *cur_segment) { // write CarParams it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); if (it != events.end()) { - car_fingerprint_ = (*it)->event.getCarParams().getCarFingerprint(); + capnp::FlatArrayMessageReader reader((*it)->data); + auto event = reader.getRoot(); + car_fingerprint_ = event.getCarParams().getCarFingerprint(); capnp::MallocMessageBuilder builder; - builder.setRoot((*it)->event.getCarParams()); + builder.setRoot(event.getCarParams()); auto words = capnp::messageToFlatArray(builder); auto bytes = words.asBytes(); Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); @@ -361,14 +368,16 @@ void Replay::publishMessage(const Event *e) { if (event_filter && event_filter(e, filter_opaque)) return; if (sm == nullptr) { - auto bytes = e->bytes(); + auto bytes = e->data.asBytes(); int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); if (ret == -1) { rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); sockets_[e->which] = nullptr; } } else { - sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}}); + capnp::FlatArrayMessageReader reader(e->data); + auto event = reader.getRoot(); + sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}}); } } @@ -382,10 +391,13 @@ void Replay::publishFrame(const Event *e) { (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) { return; } - auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs(); - if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentMerged(eidx.getSegmentNum())) { - CameraType cam = cam_types.at(e->which); - camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); + + if (isSegmentMerged(e->eidx_segnum)) { + auto &segment = segments_.at(e->eidx_segnum); + auto cam = cam_types.at(e->which); + if (auto &frame = segment->frames[cam]; frame) { + camera_server_->pushFrame(cam, frame.get(), e); + } } } @@ -399,7 +411,7 @@ void Replay::stream() { events_updated_ = false; if (exit_) break; - Event cur_event(cur_which, cur_mono_time_); + Event cur_event{cur_which, cur_mono_time_, {}}; auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); if (eit == events_->end()) { rInfo("waiting for events..."); @@ -430,7 +442,7 @@ void Replay::stream() { precise_nano_sleep(behind_ns); } - if (!evt->frame) { + if (evt->eidx_segnum == -1) { publishMessage(evt); } else if (camera_server_) { if (speed_ > 1.0) { diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index 92510053ef..a681f347bb 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -178,7 +178,7 @@ void TestReplay::testSeekTo(int seek_to) { continue; } - Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_); + Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_, {}); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); if (eit == events_->end()) { qDebug() << "waiting for events..."; diff --git a/tools/replay/util.cc b/tools/replay/util.cc index acc018fdb4..deb6293745 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -298,6 +298,7 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic BZ2_bzDecompressEnd(&strm); if (bzerror == BZ_STREAM_END && !(abort && *abort)) { out.resize(strm.total_out_lo32); + out.shrink_to_fit(); return out; } return {};