replay: simplify the code for allow/block list (#30449)

simplify allow/block list
old-commit-hash: da95fd3019
testing-closet
Dean Lee 1 year ago committed by GitHub
parent c668054d6f
commit 543ea2bf57
  1. 2
      tools/cabana/streams/replaystream.cc
  2. 16
      tools/replay/SConscript
  3. 17
      tools/replay/logreader.cc
  4. 6
      tools/replay/logreader.h
  5. 5
      tools/replay/main.cc
  6. 83
      tools/replay/replay.cc
  7. 8
      tools/replay/replay.h
  8. 9
      tools/replay/route.cc
  9. 4
      tools/replay/route.h
  10. 2
      tools/replay/tests/test_replay.cc

@ -46,7 +46,7 @@ void ReplayStream::mergeSegments() {
bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) {
replay.reset(new Replay(route, {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"}, replay.reset(new Replay(route, {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"},
{}, {}, nullptr, replay_flags, data_dir, this)); {}, nullptr, replay_flags, data_dir, this));
replay->setSegmentCacheLimit(settings.max_cached_minutes); replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this); replay->installEventFilter(event_filter, this);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);

@ -1,25 +1,21 @@
import os Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal')
Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc',
'cereal', 'transformations')
base_frameworks = qt_env['FRAMEWORKS'] base_frameworks = qt_env['FRAMEWORKS']
base_libs = [common, messaging, cereal, visionipc, transformations, 'zmq', base_libs = [common, messaging, cereal, visionipc, 'zmq',
'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread'] + qt_env["LIBS"] 'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"]
if arch == "Darwin": if arch == "Darwin":
base_frameworks.append('OpenCL') base_frameworks.append('OpenCL')
else: else:
base_libs.append('OpenCL') base_libs.append('OpenCL')
qt_libs = ['qt_util'] + base_libs
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"] replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=qt_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib') Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + qt_libs replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + base_libs
qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks)
if GetOption('extras'): if GetOption('extras'):
qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, qt_libs]) qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs])

@ -1,6 +1,7 @@
#include "tools/replay/logreader.h" #include "tools/replay/logreader.h"
#include <algorithm> #include <algorithm>
#include "tools/replay/filereader.h"
#include "tools/replay/util.h" #include "tools/replay/util.h"
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) { Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
@ -40,9 +41,7 @@ LogReader::~LogReader() {
} }
} }
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
const std::set<cereal::Event::Which> &allow,
bool local_cache, int chunk_size, int retries) {
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort); raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (raw_.empty()) return false; if (raw_.empty()) return false;
@ -50,15 +49,15 @@ bool LogReader::load(const std::string &url, std::atomic<bool> *abort,
raw_ = decompressBZ2(raw_, abort); raw_ = decompressBZ2(raw_, abort);
if (raw_.empty()) return false; if (raw_.empty()) return false;
} }
return parse(allow, abort); return parse(abort);
} }
bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) { bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
raw_.assign((const char *)data, size); raw_.assign((const char *)data, size);
return parse({}, abort); return parse(abort);
} }
bool LogReader::parse(const std::set<cereal::Event::Which> &allow, std::atomic<bool> *abort) { bool LogReader::parse(std::atomic<bool> *abort) {
try { try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0 && !(abort && *abort)) { while (words.size() > 0 && !(abort && *abort)) {
@ -67,12 +66,6 @@ bool LogReader::parse(const std::set<cereal::Event::Which> &allow, std::atomic<b
#else #else
Event *evt = new Event(words); Event *evt = new Event(words);
#endif #endif
if (!allow.empty() && allow.find(evt->which) == allow.end()) {
words = kj::arrayPtr(evt->reader.getEnd(), words.end());
delete evt;
continue;
}
// Add encodeIdx packet again as a frame packet for the video stream // Add encodeIdx packet again as a frame packet for the video stream
if (evt->which == cereal::Event::ROAD_ENCODE_IDX || if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX || evt->which == cereal::Event::DRIVER_ENCODE_IDX ||

@ -6,13 +6,11 @@
#endif #endif
#include <memory> #include <memory>
#include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include "cereal/gen/cpp/log.capnp.h" #include "cereal/gen/cpp/log.capnp.h"
#include "system/camerad/cameras/camera_common.h" #include "system/camerad/cameras/camera_common.h"
#include "tools/replay/filereader.h"
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS); const int MAX_CAMERAS = std::size(ALL_CAMERAS);
@ -55,13 +53,13 @@ class LogReader {
public: public:
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader(); ~LogReader();
bool load(const std::string &url, std::atomic<bool> *abort = nullptr, const std::set<cereal::Event::Which> &allow = {}, bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
bool local_cache = false, int chunk_size = -1, int retries = 0); bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr); bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events; std::vector<Event*> events;
private: private:
bool parse(const std::set<cereal::Event::Which> &allow, std::atomic<bool> *abort); bool parse(std::atomic<bool> *abort);
std::string raw_; std::string raw_;
#ifdef HAS_MEMORY_RESOURCE #ifdef HAS_MEMORY_RESOURCE
std::unique_ptr<std::pmr::monotonic_buffer_resource> mbr_; std::unique_ptr<std::pmr::monotonic_buffer_resource> mbr_;

@ -13,7 +13,6 @@ int main(int argc, char *argv[]) {
QCoreApplication app(argc, argv); QCoreApplication app(argc, argv);
const QStringList base_blacklist = {"uiDebug", "userFlag"};
const std::tuple<QString, REPLAY_FLAGS, QString> flags[] = { const std::tuple<QString, REPLAY_FLAGS, QString> flags[] = {
{"dcam", REPLAY_FLAG_DCAM, "load driver camera"}, {"dcam", REPLAY_FLAG_DCAM, "load driver camera"},
{"ecam", REPLAY_FLAG_ECAM, "load wide road camera"}, {"ecam", REPLAY_FLAG_ECAM, "load wide road camera"},
@ -22,7 +21,7 @@ int main(int argc, char *argv[]) {
{"qcam", REPLAY_FLAG_QCAMERA, "load qcamera"}, {"qcam", REPLAY_FLAG_QCAMERA, "load qcamera"},
{"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER, "disable HW video decoding"}, {"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER, "disable HW video decoding"},
{"no-vipc", REPLAY_FLAG_NO_VIPC, "do not output video"}, {"no-vipc", REPLAY_FLAG_NO_VIPC, "do not output video"},
{"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including " + base_blacklist.join(", ") + {"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including uiDebug, userFlag"
". this may causes issues when used along with UI"} ". this may causes issues when used along with UI"}
}; };
@ -64,7 +63,7 @@ int main(int argc, char *argv[]) {
op_prefix.reset(new OpenpilotPrefix(prefix.toStdString())); op_prefix.reset(new OpenpilotPrefix(prefix.toStdString()));
} }
Replay *replay = new Replay(route, allow, block, base_blacklist, nullptr, replay_flags, parser.value("data_dir"), &app); Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
if (!parser.value("c").isEmpty()) { if (!parser.value("c").isEmpty()) {
replay->setSegmentCacheLimit(parser.value("c").toInt()); replay->setSegmentCacheLimit(parser.value("c").toInt());
} }

@ -9,35 +9,23 @@
#include "common/timing.h" #include "common/timing.h"
#include "tools/replay/util.h" #include "tools/replay/util.h"
Replay::Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm_, uint32_t flags, QString data_dir, QObject *parent) Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_,
: sm(sm_), flags_(flags), QObject(parent) { uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
std::vector<const char *> s; if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block << "uiDebug" << "userFlag";
}
auto event_struct = capnp::Schema::from<cereal::Event>().asStruct(); auto event_struct = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_struct.getUnionFields().size()); sockets_.resize(event_struct.getUnionFields().size());
for (const auto &it : services) { for (const auto &[name, _] : services) {
auto name = it.second.name.c_str(); if (!block.contains(name.c_str()) && (allow.empty() || allow.contains(name.c_str()))) {
uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue(); uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue();
if ((which == cereal::Event::Which::UI_DEBUG || which == cereal::Event::Which::USER_FLAG) && sockets_[which] = name.c_str();
!(flags & REPLAY_FLAG_ALL_SERVICES) &&
!allow.contains(name)) {
continue;
}
if ((allow.empty() || allow.contains(name)) && !block.contains(name)) {
sockets_[which] = name;
if (!allow.empty() || !block.empty()) {
allow_list.insert((cereal::Event::Which)which);
}
s.push_back(name);
} }
} }
if (!allow_list.empty()) { std::vector<const char *> s;
// the following events are needed for replay to work properly. std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s),
allow_list.insert(cereal::Event::Which::INIT_DATA); [](const char *name) { return name != nullptr; });
allow_list.insert(cereal::Event::Which::CAR_PARAMS);
}
qDebug() << "services " << s; qDebug() << "services " << s;
qDebug() << "loading route " << route; qDebug() << "loading route " << route;
@ -150,7 +138,7 @@ void Replay::buildTimeline() {
const auto &route_segments = route_->segments(); const auto &route_segments = route_->segments();
for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) { for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
std::shared_ptr<LogReader> log(new LogReader()); std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog.toStdString(), &exit_, {}, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
for (const Event *e : log->events) { for (const Event *e : log->events) {
if (e->which == cereal::Event::Which::CONTROLS_STATE) { if (e->which == cereal::Event::Which::CONTROLS_STATE) {
@ -233,30 +221,17 @@ void Replay::segmentLoadFinished(bool success) {
} }
void Replay::queueSegment() { void Replay::queueSegment() {
if (segments_.empty()) return; auto cur = segments_.lower_bound(current_segment_.load());
if (cur == segments_.end()) return;
SegmentMap::iterator begin, cur;
begin = cur = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
int distance = std::max<int>(std::ceil(segment_cache_limit / 2.0) - 1, segment_cache_limit - std::distance(cur, segments_.end()));
for (int i = 0; begin != segments_.begin() && i < distance; ++i) {
--begin;
}
auto end = begin;
for (int i = 0; end != segments_.end() && i < segment_cache_limit; ++i) {
++end;
}
auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit, segments_.size()));
// load one segment at a time // load one segment at a time
for (auto it = cur; it != end; ++it) { auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); });
auto &[n, seg] = *it; if (it != end && !it->second) {
if ((seg && !seg->isLoaded()) || !seg) { rDebug("loading segment %d...", it->first);
if (!seg) { it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
rDebug("loading segment %d...", n); QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
seg = std::make_unique<Segment>(n, route_->at(n), flags_, allow_list);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
}
break;
}
} }
mergeSegments(begin, end); mergeSegments(begin, end);
@ -293,13 +268,11 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
new_events_->clear(); new_events_->clear();
new_events_->reserve(new_events_size); new_events_->reserve(new_events_size);
for (int n : segments_need_merge) { for (int n : segments_need_merge) {
const auto &e = segments_[n]->log->events; size_t size = new_events_->size();
if (e.size() > 0) { const auto &events = segments_[n]->log->events;
auto insert_from = e.begin(); std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_),
if (new_events_->size() > 0 && (*insert_from)->which == cereal::Event::Which::INIT_DATA) ++insert_from; [this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; });
auto middle = new_events_->insert(new_events_->end(), insert_from, e.end()); std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan());
std::inplace_merge(new_events_->begin(), middle, new_events_->end(), Event::lessThan());
}
} }
if (stream_thread_) { if (stream_thread_) {
@ -414,7 +387,7 @@ void Replay::stream() {
cur_mono_time_ = evt->mono_time; cur_mono_time_ = evt->mono_time;
setCurrentSegment(toSeconds(cur_mono_time_) / 60); setCurrentSegment(toSeconds(cur_mono_time_) / 60);
if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) { if (sockets_[cur_which] != nullptr) {
// keep time // keep time
long etime = (cur_mono_time_ - evt_start_ts) / speed_; long etime = (cur_mono_time_ - evt_start_ts) / speed_;
long rtime = nanos_since_boot() - loop_start_ts; long rtime = nanos_since_boot() - loop_start_ts;

@ -4,7 +4,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <set>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <vector> #include <vector>
@ -50,8 +49,8 @@ class Replay : public QObject {
Q_OBJECT Q_OBJECT
public: public:
Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm = nullptr, Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr,
uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0); uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0);
~Replay(); ~Replay();
bool load(); bool load();
void start(int seconds = 0); void start(int seconds = 0);
@ -114,8 +113,6 @@ protected:
} }
QThread *stream_thread_ = nullptr; QThread *stream_thread_ = nullptr;
// logs
std::mutex stream_lock_; std::mutex stream_lock_;
std::condition_variable stream_cv_; std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false; std::atomic<bool> updating_events_ = false;
@ -142,7 +139,6 @@ protected:
std::mutex timeline_lock; std::mutex timeline_lock;
QFuture<void> timeline_future; QFuture<void> timeline_future;
std::vector<std::tuple<double, double, TimelineType>> timeline; std::vector<std::tuple<double, double, TimelineType>> timeline;
std::set<cereal::Event::Which> allow_list;
std::string car_fingerprint_; std::string car_fingerprint_;
std::atomic<float> speed_ = 1.0; std::atomic<float> speed_ = 1.0;
replayEventFilter event_filter = nullptr; replayEventFilter event_filter = nullptr;

@ -7,9 +7,6 @@
#include <QRegExp> #include <QRegExp>
#include <QtConcurrent> #include <QtConcurrent>
#include <array> #include <array>
#include <memory>
#include <set>
#include <string>
#include "selfdrive/ui/qt/api.h" #include "selfdrive/ui/qt/api.h"
#include "system/hardware/hw.h" #include "system/hardware/hw.h"
@ -102,9 +99,7 @@ void Route::addFileToSegment(int n, const QString &file) {
// class Segment // class Segment
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) {
const std::set<cereal::Event::Which> &allow)
: seg_num(n), flags(flags), allow(allow) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const std::array file_list = { const std::array file_list = {
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam, (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
@ -135,7 +130,7 @@ void Segment::loadFile(int id, const std::string file) {
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3); success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3);
} else { } else {
log = std::make_unique<LogReader>(); log = std::make_unique<LogReader>();
success = log->load(file, &abort_, allow, local_cache, 0, 3); success = log->load(file, &abort_, local_cache, 0, 3);
} }
if (!success) { if (!success) {

@ -2,7 +2,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <set>
#include <string> #include <string>
#include <QDateTime> #include <QDateTime>
@ -55,7 +54,7 @@ class Segment : public QObject {
Q_OBJECT Q_OBJECT
public: public:
Segment(int n, const SegmentFile &files, uint32_t flags, const std::set<cereal::Event::Which> &allow = {}); Segment(int n, const SegmentFile &files, uint32_t flags);
~Segment(); ~Segment();
inline bool isLoaded() const { return !loading_ && !abort_; } inline bool isLoaded() const { return !loading_ && !abort_; }
@ -73,5 +72,4 @@ protected:
std::atomic<int> loading_ = 0; std::atomic<int> loading_ = 0;
QFutureSynchronizer<void> synchronizer_; QFutureSynchronizer<void> synchronizer_;
uint32_t flags; uint32_t flags;
std::set<cereal::Event::Which> allow;
}; };

@ -161,7 +161,7 @@ TEST_CASE("Remote route") {
// helper class for unit tests // helper class for unit tests
class TestReplay : public Replay { class TestReplay : public Replay {
public: public:
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, {}, nullptr, flags) {} TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {}
void test_seek(); void test_seek();
void testSeekTo(int seek_to); void testSeekTo(int seek_to);
}; };

Loading…
Cancel
Save