replay: send frames based on encodeIdx packet (#22361)

* send frames based on encodeIdx

* use start of frame time if set

* also use end of frame if set

* fix timestamp for encode packets

* handle all cameras

* add comment

* add twice
pull/22377/head
Willem Melching 4 years ago committed by GitHub
parent 318a8ba854
commit 6881688af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      selfdrive/ui/replay/logreader.cc
  2. 34
      selfdrive/ui/replay/logreader.h
  3. 37
      selfdrive/ui/replay/replay.cc
  4. 1
      selfdrive/ui/replay/replay.h

@ -38,27 +38,20 @@ bool LogReader::load(const std::string &file) {
return false; return false;
} }
auto insertEidx = [&](CameraType type, const cereal::EncodeIndex::Reader &e) {
eidx[type][e.getFrameId()] = {e.getSegmentNum(), e.getSegmentId()};
};
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0) { while (words.size() > 0) {
try { try {
std::unique_ptr<Event> evt = std::make_unique<Event>(words); std::unique_ptr<Event> evt = std::make_unique<Event>(words);
switch (evt->which) {
case cereal::Event::ROAD_ENCODE_IDX: // Add encodeIdx packet again as a frame packet for the video stream
insertEidx(RoadCam, evt->event.getRoadEncodeIdx()); if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
break; evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
case cereal::Event::DRIVER_ENCODE_IDX: evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
insertEidx(DriverCam, evt->event.getDriverEncodeIdx());
break; std::unique_ptr<Event> frame_evt = std::make_unique<Event>(words, true);
case cereal::Event::WIDE_ROAD_ENCODE_IDX: events.push_back(frame_evt.release());
insertEidx(WideRoadCam, evt->event.getWideRoadEncodeIdx());
break;
default:
break;
} }
words = kj::arrayPtr(evt->reader.getEnd(), words.end()); words = kj::arrayPtr(evt->reader.getEnd(), words.end());
events.push_back(evt.release()); events.push_back(evt.release());
} catch (const kj::Exception &e) { } catch (const kj::Exception &e) {

@ -2,6 +2,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <cassert>
#include <capnp/serialize.h> #include <capnp/serialize.h>
#include "cereal/gen/cpp/log.capnp.h" #include "cereal/gen/cpp/log.capnp.h"
@ -9,10 +10,7 @@
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS); const int MAX_CAMERAS = std::size(ALL_CAMERAS);
struct EncodeIdx {
int segmentNum;
uint32_t frameEncodeId;
};
class Event { class Event {
public: public:
Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr<capnp::word>{}) { Event(cereal::Event::Which which, uint64_t mono_time) : reader(kj::ArrayPtr<capnp::word>{}) {
@ -20,11 +18,35 @@ public:
this->which = which; this->which = which;
this->mono_time = mono_time; this->mono_time = mono_time;
} }
Event(const kj::ArrayPtr<const capnp::word> &amsg) : reader(amsg) { Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame=false) : reader(amsg), frame(frame) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd()); words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>(); event = reader.getRoot<cereal::Event>();
which = event.which(); which = event.which();
mono_time = event.getLogMonoTime(); mono_time = event.getLogMonoTime();
// 1) Send video data at t=timestampEof/timestampSof
// 2) Send encodeIndex packet at t=logMonoTime
if (frame) {
cereal::EncodeIndex::Reader idx;
if (which == cereal::Event::ROAD_ENCODE_IDX) {
idx = event.getRoadEncodeIdx();
} else if (which == cereal::Event::DRIVER_ENCODE_IDX) {
idx = event.getDriverEncodeIdx();
} else if (which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
idx = event.getWideRoadEncodeIdx();
} else {
assert(false);
}
// C2 only has eof set, and some older routes have neither
uint64_t sof = idx.getTimestampSof();
uint64_t eof = idx.getTimestampEof();
if (sof > 0) {
mono_time = sof;
} else if (eof > 0) {
mono_time = eof;
}
}
} }
inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); } inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); }
@ -39,6 +61,7 @@ public:
cereal::Event::Reader event; cereal::Event::Reader event;
capnp::FlatArrayMessageReader reader; capnp::FlatArrayMessageReader reader;
kj::ArrayPtr<const capnp::word> words; kj::ArrayPtr<const capnp::word> words;
bool frame;
}; };
class LogReader { class LogReader {
@ -48,7 +71,6 @@ public:
bool load(const std::string &file); bool load(const std::string &file);
std::vector<Event*> events; std::vector<Event*> events;
std::unordered_map<uint32_t, EncodeIdx> eidx[MAX_CAMERAS] = {};
private: private:
std::vector<uint8_t> raw_; std::vector<uint8_t> raw_;

@ -92,7 +92,7 @@ void Replay::seekTo(int seconds, bool relative) {
cur_mono_time_ = route_start_ts_ + seconds * 1e9; cur_mono_time_ = route_start_ts_ + seconds * 1e9;
setCurrentSegment(segment); setCurrentSegment(segment);
bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end(); bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end();
// return false if segment changed and not loaded yet // return false if segment changed and not loaded yet
return !segment_changed || segment_loaded; return !segment_changed || segment_loaded;
}); });
@ -150,19 +150,14 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
// merge & sort events // merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>(); std::vector<Event *> *new_events = new std::vector<Event *>();
std::unordered_map<uint32_t, EncodeIdx> *new_eidx = new std::unordered_map<uint32_t, EncodeIdx>[MAX_CAMERAS];
for (int n : segments_need_merge) { for (int n : segments_need_merge) {
auto &log = segments_[n]->log; auto &log = segments_[n]->log;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
for (CameraType cam_type : ALL_CAMERAS) {
new_eidx[cam_type].insert(log->eidx[cam_type].begin(), log->eidx[cam_type].end());
}
} }
// update events // update events
auto prev_events = events_; auto prev_events = events_;
auto prev_eidx = eidx_;
updateEvents([&]() { updateEvents([&]() {
if (route_start_ts_ == 0) { if (route_start_ts_ == 0) {
// get route start time from initData // get route start time from initData
@ -174,12 +169,10 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
} }
events_ = new_events; events_ = new_events;
eidx_ = new_eidx;
segments_merged_ = segments_need_merge; segments_merged_ = segments_need_merge;
return true; return true;
}); });
delete prev_events; delete prev_events;
delete[] prev_eidx;
} }
// free segments out of current semgnt window. // free segments out of current semgnt window.
@ -239,14 +232,15 @@ void Replay::stream() {
} }
// publish frame // publish frame
// TODO: publish all frames if (evt->frame) {
if (evt->which == cereal::Event::ROAD_CAMERA_STATE) { // TODO: publish all frames
auto it_ = eidx_[RoadCam].find(evt->event.getRoadCameraState().getFrameId()); if (evt->which == cereal::Event::ROAD_ENCODE_IDX) {
if (it_ != eidx_[RoadCam].end()) { auto idx = evt->event.getRoadEncodeIdx();
EncodeIdx &e = it_->second; auto &seg = segments_[idx.getSegmentNum()];
auto &seg = segments_[e.segmentNum];
if (seg && seg->isLoaded()) { if (seg && seg->isLoaded() && idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
auto &frm = seg->frames[RoadCam]; auto &frm = seg->frames[RoadCam];
if (vipc_server == nullptr) { if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
@ -257,7 +251,7 @@ void Replay::stream() {
vipc_server->start_listener(); vipc_server->start_listener();
} }
uint8_t *dat = frm->get(e.frameEncodeId); uint8_t *dat = frm->get(idx.getSegmentId());
if (dat) { if (dat) {
VisionIpcBufExtra extra = {}; VisionIpcBufExtra extra = {};
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
@ -266,14 +260,15 @@ void Replay::stream() {
} }
} }
} }
}
// publish msg // publish msg
if (sm == nullptr) {
auto bytes = evt->bytes();
pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size());
} else { } else {
sm->update_msgs(nanos_since_boot(), {{type, evt->event}}); if (sm == nullptr) {
auto bytes = evt->bytes();
pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size());
} else {
sm->update_msgs(nanos_since_boot(), {{type, evt->event}});
}
} }
} }
} }

@ -48,7 +48,6 @@ protected:
uint64_t route_start_ts_ = 0; uint64_t route_start_ts_ = 0;
uint64_t cur_mono_time_ = 0; uint64_t cur_mono_time_ = 0;
std::vector<Event *> *events_ = nullptr; std::vector<Event *> *events_ = nullptr;
std::unordered_map<uint32_t, EncodeIdx> *eidx_ = nullptr;
std::vector<std::unique_ptr<Segment>> segments_; std::vector<std::unique_ptr<Segment>> segments_;
std::vector<int> segments_merged_; std::vector<int> segments_merged_;

Loading…
Cancel
Save