#include "selfdrive/ui/replay/replay.h" #include #include #include "cereal/services.h" #include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/timing.h" #include "selfdrive/hardware/hw.h" int getch() { int ch; struct termios oldt; struct termios newt; tcgetattr(STDIN_FILENO, &oldt); newt = oldt; newt.c_lflag &= ~(ICANON | ECHO); tcsetattr(STDIN_FILENO, TCSANOW, &newt); ch = getchar(); tcsetattr(STDIN_FILENO, TCSANOW, &oldt); return ch; } Replay::Replay(QString route, SubMaster *sm_, QObject *parent) : sm(sm_), QObject(parent) { QStringList block = QString(getenv("BLOCK")).split(","); qDebug() << "blocklist" << block; QStringList allow = QString(getenv("ALLOW")).split(","); qDebug() << "allowlist" << allow; std::vector s; for (const auto &it : services) { if ((allow[0].size() == 0 || allow.contains(it.name)) && !block.contains(it.name)) { s.push_back(it.name); socks.append(std::string(it.name)); } } qDebug() << "services " << s; if (sm == nullptr) { pm = new PubMaster(s); } const QString url = CommaApi::BASE_URL + "/v1/route/" + route + "/files"; http = new HttpRequest(this, !Hardware::PC()); QObject::connect(http, &HttpRequest::receivedResponse, this, &Replay::parseResponse); http->sendRequest(url); } void Replay::parseResponse(const QString &response) { QJsonDocument doc = QJsonDocument::fromJson(response.trimmed().toUtf8()); if (doc.isNull()) { qDebug() << "JSON Parse failed"; return; } camera_paths = doc["cameras"].toArray(); log_paths = doc["logs"].toArray(); seekTime(0); } void Replay::addSegment(int n) { assert((n >= 0) && (n < log_paths.size()) && (n < camera_paths.size())); if (lrs.find(n) != lrs.end()) { return; } lrs[n] = new LogReader(log_paths.at(n).toString()); // this is a queued connection, mergeEvents is executed in the main thread. QObject::connect(lrs[n], &LogReader::finished, this, &Replay::mergeEvents); frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString())); QThread * t = QThread::create([=]() { frs[n]->process(); }); QObject::connect(t, &QThread::finished, t, &QThread::deleteLater); t->start(); } void Replay::mergeEvents() { const int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); const int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); // merge logs QMultiMap *new_events = new QMultiMap(); std::unordered_map *new_eidx = new std::unordered_map[MAX_CAMERAS]; for (int i = start_idx; i <= end_idx; ++i) { if (auto it = lrs.find(i); it != lrs.end()) { *new_events += (*it)->events; for (CameraType cam_type : ALL_CAMERAS) { new_eidx[cam_type].insert((*it)->eidx[cam_type].begin(), (*it)->eidx[cam_type].end()); } } } // update logs updating_events = true; // set updating_events to true to force stream thread relase the lock lock.lock(); auto prev_events = std::exchange(events, new_events); auto prev_eidx = std::exchange(eidx, new_eidx); lock.unlock(); // free logs delete prev_events; delete[] prev_eidx; for (int i = 0; i < log_paths.size(); i++) { if (i < start_idx || i > end_idx) { delete lrs.take(i); delete frs.take(i); } } } void Replay::start(){ thread = new QThread; QObject::connect(thread, &QThread::started, [=](){ stream(); }); thread->start(); kb_thread = new QThread; QObject::connect(kb_thread, &QThread::started, [=](){ keyboardThread(); }); kb_thread->start(); queue_thread = new QThread; QObject::connect(queue_thread, &QThread::started, [=](){ segmentQueueThread(); }); queue_thread->start(); } void Replay::seekTime(int ts) { ts = std::clamp(ts, 0, log_paths.size() * 60); qInfo() << "seeking to " << ts; seek_ts = ts; current_segment = ts/60; updating_events = true; } void Replay::segmentQueueThread() { // maintain the segment window while (true) { int start_idx = std::max(current_segment - BACKWARD_SEGS, 0); int end_idx = std::min(current_segment + FORWARD_SEGS, log_paths.size()); for (int i = 0; i < log_paths.size(); i++) { if (i >= start_idx && i <= end_idx) { addSegment(i); } } QThread::msleep(100); } } void Replay::keyboardThread() { char c; while (true) { c = getch(); if(c == '\n'){ printf("Enter seek request: "); std::string r; std::cin >> r; try { if(r[0] == '#') { r.erase(0, 1); seekTime(std::stoi(r)*60); } else { seekTime(std::stoi(r)); } } catch (std::invalid_argument) { qDebug() << "invalid argument"; } getch(); // remove \n from entering seek } else if (c == 'm') { seekTime(current_ts + 60); } else if (c == 'M') { seekTime(current_ts - 60); } else if (c == 's') { seekTime(current_ts + 10); } else if (c == 'S') { seekTime(current_ts - 10); } else if (c == 'G') { seekTime(0); } } } void Replay::stream() { QElapsedTimer timer; timer.start(); route_start_ts = 0; uint64_t cur_mono_time = 0; while (true) { std::unique_lock lk(lock); if (!events || events->size() == 0) { lk.unlock(); qDebug() << "waiting for events"; QThread::msleep(100); continue; } // TODO: use initData's logMonoTime if (route_start_ts == 0) { route_start_ts = events->firstKey(); } uint64_t t0 = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time; seek_ts = -1; qDebug() << "unlogging at" << int((t0 - route_start_ts) / 1e9); uint64_t t0r = timer.nsecsElapsed(); for (auto eit = events->lowerBound(t0); !updating_events && eit != events->end(); ++eit) { cereal::Event::Reader e = (*eit)->event; cur_mono_time = (*eit)->mono_time; current_segment = (cur_mono_time - route_start_ts) / 1e9 / 60; std::string type; KJ_IF_MAYBE(e_, static_cast(e).which()) { type = e_->getProto().getName(); } current_ts = std::max(cur_mono_time - route_start_ts, (uint64_t)0) / 1e9; if (socks.contains(type)) { float timestamp = (cur_mono_time - route_start_ts)/1e9; if (std::abs(timestamp - last_print) > 5.0) { last_print = timestamp; qInfo() << "at " << int(last_print) << "s"; } // keep time long etime = cur_mono_time-t0; long rtime = timer.nsecsElapsed() - t0r; long us_behind = ((etime-rtime)*1e-3)+0.5; if (us_behind > 0 && us_behind < 1e6) { QThread::usleep(us_behind); //qDebug() << "sleeping" << us_behind << etime << timer.nsecsElapsed(); } // publish frame // TODO: publish all frames if (type == "roadCameraState") { auto fr = e.getRoadCameraState(); auto it_ = eidx[RoadCam].find(fr.getFrameId()); if (it_ != eidx[RoadCam].end()) { EncodeIdx &e = it_->second; if (frs.find(e.segmentNum) != frs.end()) { auto frm = frs[e.segmentNum]; if (vipc_server == nullptr) { cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); vipc_server = new VisionIpcServer("camerad", device_id, context); vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, UI_BUF_COUNT, true, frm->width, frm->height); vipc_server->start_listener(); } uint8_t *dat = frm->get(e.frameEncodeId); if (dat) { VisionIpcBufExtra extra = {}; VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); memcpy(buf->addr, dat, frm->getRGBSize()); vipc_server->send(buf, &extra, false); } } } } // publish msg if (sm == nullptr) { auto bytes = (*eit)->bytes(); pm->send(type.c_str(), (capnp::byte *)bytes.begin(), bytes.size()); } else { std::vector> messages; messages.push_back({type, e}); sm->update_msgs(nanos_since_boot(), messages); } } } lk.unlock(); updating_events = false; usleep(0); } }