openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

202 lines
5.5 KiB

#include <cmath>
#include <string>
#include <vector>
#include <capnp/dynamic.h>
#include <capnp/schema.h>
// include the dynamic struct
#include "cereal/gen/cpp/log.capnp.c++"
#include "cereal/gen/cpp/car.capnp.c++"
#include "cereal/gen/cpp/legacy.capnp.c++"
#include "cereal/services.h"
#include "Unlogger.hpp"
#include <stdint.h>
#include <time.h>
#include "common/timing.h"
Unlogger::Unlogger(Events *events_, QReadWriteLock* events_lock_, QMap<int, FrameReader*> *frs_, int seek)
: events(events_), events_lock(events_lock_), frs(frs_) {
ctx = Context::create();
seek_request = seek*1e9;
QStringList block = QString(getenv("BLOCK")).split(",");
qDebug() << "blocklist" << block;
QStringList allow = QString(getenv("ALLOW")).split(",");
qDebug() << "allowlist" << allow;
for (const auto& it : services) {
std::string name = it.name;
if (allow[0].size() > 0 && !allow.contains(name.c_str())) {
qDebug() << "not allowing" << name.c_str();
continue;
}
if (block.contains(name.c_str())) {
qDebug() << "blocking" << name.c_str();
continue;
}
PubSocket *sock = PubSocket::create(ctx, name);
if (sock == NULL) {
qDebug() << "FAILED" << name.c_str();
continue;
}
qDebug() << name.c_str();
socks.insert(name, sock);
}
}
void Unlogger::process(SubMaster *sm) {
qDebug() << "hello from unlogger thread";
while (events->size() == 0) {
qDebug() << "waiting for events";
QThread::sleep(1);
}
qDebug() << "got events";
// TODO: hack
if (seek_request != 0) {
seek_request += events->begin().key();
while (events->lowerBound(seek_request) == events->end()) {
qDebug() << "waiting for desired time";
QThread::sleep(1);
}
}
QElapsedTimer timer;
timer.start();
uint64_t last_elapsed = 0;
// loops
while (1) {
uint64_t t0 = (events->begin()+1).key();
uint64_t t0r = timer.nsecsElapsed();
qDebug() << "unlogging at" << t0;
auto eit = events->lowerBound(t0);
while (eit != events->end()) {
float time_to_end = ((events->lastKey() - eit.key())/1e9);
if (loading_segment && (time_to_end > 20.0)){
loading_segment = false;
}
while (paused) {
QThread::usleep(1000);
t0 = eit->getLogMonoTime();
t0r = timer.nsecsElapsed();
}
if (seek_request != 0) {
t0 = seek_request;
qDebug() << "seeking to" << t0;
t0r = timer.nsecsElapsed();
eit = events->lowerBound(t0);
seek_request = 0;
if (eit == events->end()) {
qWarning() << "seek off end";
break;
}
}
if (abs(((long long)tc-(long long)last_elapsed)) > 50e6) {
//qDebug() << "elapsed";
emit elapsed();
last_elapsed = tc;
}
cereal::Event::Reader e = *eit;
capnp::DynamicStruct::Reader e_ds = static_cast<capnp::DynamicStruct::Reader>(e);
std::string type;
KJ_IF_MAYBE(e_, e_ds.which()){
type = e_->getProto().getName();
}
uint64_t tm = e.getLogMonoTime();
auto it = socks.find(type);
tc = tm;
if (it != socks.end()) {
long etime = tm-t0;
float timestamp = etime/1e9;
if(std::abs(timestamp-last_print) > 5.0){
last_print = timestamp;
printf("at %f\n", last_print);
}
long rtime = timer.nsecsElapsed() - t0r;
long us_behind = ((etime-rtime)*1e-3)+0.5;
if (us_behind > 0) {
if (us_behind > 1e6) {
qWarning() << "OVER ONE SECOND BEHIND, HACKING" << us_behind;
us_behind = 0;
t0 = tm;
t0r = timer.nsecsElapsed();
}
QThread::usleep(us_behind);
//qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed();
}
if (type == "roadCameraState") {
auto fr = e.getRoadCameraState();
auto it_ = eidx.find(fr.getFrameId());
if (it_ != eidx.end()) {
auto pp = *it_;
//qDebug() << fr.getRoadCameraStateId() << pp;
if (frs->find(pp.first) != frs->end()) {
auto frm = (*frs)[pp.first];
auto data = frm->get(pp.second);
if (vipc_server == nullptr) {
cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT);
cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err));
vipc_server = new VisionIpcServer("camerad", device_id, context);
vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, 4, true, frm->width, frm->height);
vipc_server->start_listener();
}
VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK);
memcpy(buf->addr, data, frm->getRGBSize());
VisionIpcBufExtra extra = {};
vipc_server->send(buf, &extra, false);
}
}
}
if (sm == nullptr){
capnp::MallocMessageBuilder msg;
msg.setRoot(e);
auto words = capnp::messageToFlatArray(msg);
auto bytes = words.asBytes();
(*it)->send((char*)bytes.begin(), bytes.size());
} else{
std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
messages.push_back({type, e});
sm->update_msgs(nanos_since_boot(), messages);
}
}
++eit;
if (time_to_end < 10.0 && !loading_segment){
loading_segment = true;
emit loadSegment();
}
}
}
}