|
|
|
@ -6,6 +6,7 @@ |
|
|
|
|
#include <capnp/dynamic.h> |
|
|
|
|
#include "cereal/services.h" |
|
|
|
|
#include "selfdrive/common/timing.h" |
|
|
|
|
#include "selfdrive/common/params.h" |
|
|
|
|
#include "selfdrive/hardware/hw.h" |
|
|
|
|
#include "selfdrive/ui/replay/util.h" |
|
|
|
|
|
|
|
|
@ -15,11 +16,10 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s |
|
|
|
|
auto event_struct = capnp::Schema::from<cereal::Event>().asStruct(); |
|
|
|
|
sockets_.resize(event_struct.getUnionFields().size()); |
|
|
|
|
for (const auto &it : services) { |
|
|
|
|
if ((allow.size() == 0 || allow.contains(it.name)) && |
|
|
|
|
!block.contains(it.name)) { |
|
|
|
|
s.push_back(it.name); |
|
|
|
|
if ((allow.empty() || allow.contains(it.name)) && !block.contains(it.name)) { |
|
|
|
|
uint16_t which = event_struct.getFieldByName(it.name).getProto().getDiscriminantValue(); |
|
|
|
|
sockets_[which] = it.name; |
|
|
|
|
s.push_back(it.name); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
qDebug() << "services " << s; |
|
|
|
@ -29,15 +29,14 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s |
|
|
|
|
} |
|
|
|
|
route_ = std::make_unique<Route>(route, data_dir); |
|
|
|
|
events_ = new std::vector<Event *>(); |
|
|
|
|
// doSeek & queueSegment are always executed in the same thread
|
|
|
|
|
|
|
|
|
|
connect(this, &Replay::seekTo, this, &Replay::doSeek); |
|
|
|
|
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Replay::~Replay() { |
|
|
|
|
qDebug() << "shutdown: in progress..."; |
|
|
|
|
exit_ = true; |
|
|
|
|
updating_events_ = true; |
|
|
|
|
exit_ = updating_events_ = true; |
|
|
|
|
if (stream_thread_) { |
|
|
|
|
stream_cv_.notify_one(); |
|
|
|
|
stream_thread_->quit(); |
|
|
|
@ -72,12 +71,6 @@ bool Replay::load() { |
|
|
|
|
|
|
|
|
|
void Replay::start(int seconds) { |
|
|
|
|
seekTo(seconds, false); |
|
|
|
|
|
|
|
|
|
camera_server_ = std::make_unique<CameraServer>(); |
|
|
|
|
// start stream thread
|
|
|
|
|
stream_thread_ = new QThread(this); |
|
|
|
|
QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); }); |
|
|
|
|
stream_thread_->start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Replay::updateEvents(const std::function<bool()> &lambda) { |
|
|
|
@ -133,15 +126,16 @@ void Replay::segmentLoadFinished(bool success) { |
|
|
|
|
|
|
|
|
|
void Replay::queueSegment() { |
|
|
|
|
// get the current segment window
|
|
|
|
|
SegmentMap::iterator begin, end; |
|
|
|
|
begin = end = segments_.lower_bound(current_segment_); |
|
|
|
|
SegmentMap::iterator begin, cur, end; |
|
|
|
|
begin = cur = end = segments_.lower_bound(current_segment_); |
|
|
|
|
for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) { |
|
|
|
|
--begin; |
|
|
|
|
} |
|
|
|
|
for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) { |
|
|
|
|
++end; |
|
|
|
|
} |
|
|
|
|
// load segments
|
|
|
|
|
|
|
|
|
|
// load & merge segments
|
|
|
|
|
for (auto it = begin; it != end; ++it) { |
|
|
|
|
auto &[n, seg] = *it; |
|
|
|
|
if (!seg) { |
|
|
|
@ -150,8 +144,8 @@ void Replay::queueSegment() { |
|
|
|
|
qInfo() << "loading segment" << n << "..."; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// merge segments
|
|
|
|
|
mergeSegments(begin, end); |
|
|
|
|
|
|
|
|
|
// free segments out of current semgnt window.
|
|
|
|
|
for (auto it = segments_.begin(); it != begin; ++it) { |
|
|
|
|
it->second.reset(nullptr); |
|
|
|
@ -159,6 +153,11 @@ void Replay::queueSegment() { |
|
|
|
|
for (auto it = end; it != segments_.end(); ++it) { |
|
|
|
|
it->second.reset(nullptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// start stream thread
|
|
|
|
|
if (stream_thread_ == nullptr && cur != segments_.end() && cur->second->isLoaded()) { |
|
|
|
|
startStream(cur->second.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { |
|
|
|
@ -170,7 +169,6 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: |
|
|
|
|
|
|
|
|
|
if (segments_need_merge != segments_merged_) { |
|
|
|
|
qDebug() << "merge segments" << segments_need_merge; |
|
|
|
|
// merge & sort events
|
|
|
|
|
std::vector<Event *> *new_events = new std::vector<Event *>(); |
|
|
|
|
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, |
|
|
|
|
[=](int v, int n) { return v + segments_[n]->log->events.size(); })); |
|
|
|
@ -179,19 +177,9 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: |
|
|
|
|
auto middle = new_events->insert(new_events->end(), e.begin(), e.end()); |
|
|
|
|
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); |
|
|
|
|
} |
|
|
|
|
// update events
|
|
|
|
|
|
|
|
|
|
auto prev_events = events_; |
|
|
|
|
updateEvents([&]() { |
|
|
|
|
if (route_start_ts_ == 0) { |
|
|
|
|
// get route start time from initData
|
|
|
|
|
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); |
|
|
|
|
if (it != new_events->end()) { |
|
|
|
|
route_start_ts_ = (*it)->mono_time; |
|
|
|
|
// cur_mono_time_ is set by seekTo in start() before get route_start_ts_
|
|
|
|
|
cur_mono_time_ += route_start_ts_; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
events_ = new_events; |
|
|
|
|
segments_merged_ = segments_need_merge; |
|
|
|
|
return true; |
|
|
|
@ -202,6 +190,38 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Replay::startStream(const Segment *cur_segment) { |
|
|
|
|
const auto &events = cur_segment->log->events; |
|
|
|
|
|
|
|
|
|
// get route start time from initData
|
|
|
|
|
auto it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); |
|
|
|
|
route_start_ts_ = it != events.end() ? (*it)->mono_time : events[0]->mono_time; |
|
|
|
|
cur_mono_time_ += route_start_ts_; |
|
|
|
|
|
|
|
|
|
// write CarParams
|
|
|
|
|
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); |
|
|
|
|
if (it != events.end()) { |
|
|
|
|
auto bytes = (*it)->bytes(); |
|
|
|
|
Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); |
|
|
|
|
} else { |
|
|
|
|
qInfo() << "failed to read CarParams from current segment"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// start camera server
|
|
|
|
|
std::pair<int, int> cameras[MAX_CAMERAS] = {}; |
|
|
|
|
for (auto type : ALL_CAMERAS) { |
|
|
|
|
if (auto &fr = cur_segment->frames[type]) { |
|
|
|
|
cameras[type] = {fr->width, fr->height}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
camera_server_ = std::make_unique<CameraServer>(cameras); |
|
|
|
|
|
|
|
|
|
// start stream thread
|
|
|
|
|
stream_thread_ = QThread::create(&Replay::stream, this); |
|
|
|
|
QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater); |
|
|
|
|
stream_thread_->start(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Replay::publishMessage(const Event *e) { |
|
|
|
|
if (sm == nullptr) { |
|
|
|
|
auto bytes = e->bytes(); |
|
|
|
@ -221,15 +241,13 @@ void Replay::publishFrame(const Event *e) { |
|
|
|
|
{cereal::Event::DRIVER_ENCODE_IDX, DriverCam}, |
|
|
|
|
{cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam}, |
|
|
|
|
}; |
|
|
|
|
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>(); |
|
|
|
|
if (std::find(segments_merged_.begin(), segments_merged_.end(), eidx.getSegmentNum()) == segments_merged_.end()) { |
|
|
|
|
// eidx's segment is not loaded
|
|
|
|
|
if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !load_dcam) || (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !load_ecam)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
CameraType cam = cam_types.at(e->which); |
|
|
|
|
auto &fr = segments_[eidx.getSegmentNum()]->frames[cam]; |
|
|
|
|
if (fr && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { |
|
|
|
|
camera_server_->pushFrame(cam, fr.get(), eidx); |
|
|
|
|
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>(); |
|
|
|
|
if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentLoaded(eidx.getSegmentNum())) { |
|
|
|
|
CameraType cam = cam_types.at(e->which); |
|
|
|
|
camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -295,8 +313,12 @@ void Replay::stream() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
|
|
|
|
|
camera_server_->waitFinish(); |
|
|
|
|
|
|
|
|
|
if (eit == events_->end() && (current_segment_ == segments_.rbegin()->first) && isSegmentLoaded(current_segment_)) { |
|
|
|
|
qInfo() << "reaches the end of route, restart from beginning"; |
|
|
|
|
emit seekTo(0, false); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|