replay: refactor Route and Segment (#22531)

* new functions

* fix wrong call to qUrl::isLocalFile

* cleanup

* keep extension in cached files

* cleanup

* simplify segment

* delete thread

* add output

* pre-decompress

* remove suffix

* revert remove suffix

* 1 connection for log file

* cleanup

* segment may not be continuous,use map

* don't emit finish if aborting_

* use QFuture and thread pool

* cleanup

* fix segfault in LoadFromLocal

* cleanup

* handle segment failed to load

* output info

* continue error handling

* Remove redundant testSeekTo

* cleanup

* always return true

* keep time is ok now

change to 1s

write 1 byte at the end of the sparse file

* log loading segment

* merge #22476

* Update selfdrive/ui/replay/logreader.cc

* correct connect

* pub message in function

* typo

* Update selfdrive/ui/replay/replay.cc

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: acc52ece20
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent b87ec5ad68
commit f5335bd42d
  1. 6
      selfdrive/ui/replay/logreader.cc
  2. 5
      selfdrive/ui/replay/logreader.h
  3. 78
      selfdrive/ui/replay/replay.cc
  4. 4
      selfdrive/ui/replay/replay.h
  5. 217
      selfdrive/ui/replay/route.cc
  6. 38
      selfdrive/ui/replay/route.h
  7. 4
      selfdrive/ui/replay/tests/test_replay.cc
  8. 6
      selfdrive/ui/replay/util.cc

@ -1,6 +1,5 @@
#include "selfdrive/ui/replay/logreader.h" #include "selfdrive/ui/replay/logreader.h"
#include <cassert>
#include <sstream> #include <sstream>
#include "selfdrive/common/util.h" #include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/util.h" #include "selfdrive/ui/replay/util.h"
@ -32,8 +31,9 @@ LogReader::~LogReader() {
for (auto e : events) delete e; for (auto e : events) delete e;
} }
bool LogReader::load(const std::string &file, bool is_bz2file) { bool LogReader::load(const std::string &file) {
if (is_bz2file) { bool is_bz2 = file.rfind(".bz2") == file.length() - 4;
if (is_bz2) {
std::ostringstream stream; std::ostringstream stream;
if (!readBZ2File(file, stream)) { if (!readBZ2File(file, stream)) {
LOGW("bz2 decompress failed"); LOGW("bz2 decompress failed");

@ -1,8 +1,5 @@
#pragma once #pragma once
#include <unordered_map>
#include <cassert>
#include <capnp/serialize.h> #include <capnp/serialize.h>
#include "cereal/gen/cpp/log.capnp.h" #include "cereal/gen/cpp/log.capnp.h"
#include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/camerad/cameras/camera_common.h"
@ -38,7 +35,7 @@ class LogReader {
public: public:
LogReader() = default; LogReader() = default;
~LogReader(); ~LogReader();
bool load(const std::string &file, bool is_bz2file); bool load(const std::string &file);
std::vector<Event*> events; std::vector<Event*> events;

@ -57,10 +57,9 @@ bool Replay::load() {
return false; return false;
} }
for (int i = 0; i < route_->size(); ++i) { for (auto &[n, f] : route_->segments()) {
const SegmentFile &f = route_->at(i);
if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) { if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) {
segments_[i] = nullptr; segments_[n] = nullptr;
} }
} }
if (segments_.empty()) { if (segments_.empty()) {
@ -98,8 +97,9 @@ void Replay::doSeek(int seconds, bool relative) {
seconds += currentSeconds(); seconds += currentSeconds();
} }
qInfo() << "seeking to" << seconds; qInfo() << "seeking to" << seconds;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.rbegin()->first * 60) * 1e9; const int max_segment_number = segments_.rbegin()->first;
current_segment_ = std::min(seconds / 60, (int)segments_.rbegin()->first - 1); cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (max_segment_number + 1) * 60) * 1e9;
current_segment_ = std::min(seconds / 60, max_segment_number);
return false; return false;
}); });
queueSegment(); queueSegment();
@ -122,16 +122,32 @@ void Replay::setCurrentSegment(int n) {
} }
} }
// maintain the segment window void Replay::segmentLoadFinished(bool success) {
if (!success) {
Segment *seg = qobject_cast<Segment *>(sender());
qInfo() << "failed to load segment " << seg->seg_num << ", removing it from current replay list";
segments_.erase(seg->seg_num);
}
queueSegment();
}
void Replay::queueSegment() { void Replay::queueSegment() {
// forward fetch segments // get the current segment window
SegmentMap::iterator begin, end; SegmentMap::iterator begin, end;
begin = end = segments_.lower_bound(current_segment_); begin = end = segments_.lower_bound(current_segment_);
for (int fwd = 0; end != segments_.end() && fwd <= FORWARD_SEGS; ++end, ++fwd) { for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) {
auto &[n, seg] = *end; --begin;
}
for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) {
++end;
}
// load segments
for (auto it = begin; it != end; ++it) {
auto &[n, seg] = *it;
if (!seg) { if (!seg) {
seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam); seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::queueSegment); QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
qInfo() << "loading segment" << n << "...";
} }
} }
// merge segments // merge segments
@ -159,8 +175,8 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
[=](int v, int n) { return v + segments_[n]->log->events.size(); })); [=](int v, int n) { return v + segments_[n]->log->events.size(); }));
for (int n : segments_need_merge) { for (int n : segments_need_merge) {
auto &log = segments_[n]->log; auto &e = segments_[n]->log->events;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); auto middle = new_events->insert(new_events->end(), e.begin(), e.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
} }
// update events // update events
@ -182,7 +198,20 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
}); });
delete prev_events; delete prev_events;
} else { } else {
updateEvents([=]() { return begin->second->isLoaded(); }); updateEvents([=]() { return true; });
}
}
void Replay::publishMessage(const Event *e) {
if (sm == nullptr) {
auto bytes = e->bytes();
int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
qDebug() << "stop publishing" << sockets_[e->which] << "due to multiple publishers error";
sockets_[e->which] = nullptr;
}
} else {
sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}});
} }
} }
@ -222,8 +251,8 @@ void Replay::stream() {
continue; continue;
} }
const uint64_t evt_start_ts = cur_mono_time_; uint64_t evt_start_ts = cur_mono_time_;
const uint64_t loop_start_ts = nanos_since_boot(); uint64_t loop_start_ts = nanos_since_boot();
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
const Event *evt = (*eit); const Event *evt = (*eit);
@ -250,24 +279,19 @@ void Replay::stream() {
long etime = cur_mono_time_ - evt_start_ts; long etime = cur_mono_time_ - evt_start_ts;
long rtime = nanos_since_boot() - loop_start_ts; long rtime = nanos_since_boot() - loop_start_ts;
long behind_ns = etime - rtime; long behind_ns = etime - rtime;
if (behind_ns > 0) { // if behind_ns is greater than 1 second, it means that an invalid segemnt is skipped by seeking/replaying
if (behind_ns >= 1 * 1e9) {
// reset start times
evt_start_ts = cur_mono_time_;
loop_start_ts = nanos_since_boot();
} else if (behind_ns > 0) {
precise_nano_sleep(behind_ns); precise_nano_sleep(behind_ns);
} }
if (evt->frame) { if (evt->frame) {
publishFrame(evt); publishFrame(evt);
} else { } else {
// publish msg publishMessage(evt);
if (sm == nullptr) {
auto bytes = evt->bytes();
int ret = pm->send(sockets_[cur_which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
qDebug() << "stop publishing" << sockets_[cur_which] << "due to multiple publishers error";
sockets_[cur_which] = nullptr;
}
} else {
sm->update_msgs(nanos_since_boot(), {{sockets_[cur_which], evt->event}});
}
} }
} }
} }

@ -6,7 +6,7 @@
#include "selfdrive/ui/replay/route.h" #include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2; constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 2; constexpr int BACKWARD_SEGS = 1;
class Replay : public QObject { class Replay : public QObject {
Q_OBJECT Q_OBJECT
@ -27,6 +27,7 @@ signals:
protected slots: protected slots:
void queueSegment(); void queueSegment();
void doSeek(int seconds, bool relative); void doSeek(int seconds, bool relative);
void segmentLoadFinished(bool sucess);
protected: protected:
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap; typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
@ -34,6 +35,7 @@ protected:
void setCurrentSegment(int n); void setCurrentSegment(int n);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& lambda); void updateEvents(const std::function<bool()>& lambda);
void publishMessage(const Event *e);
void publishFrame(const Event *e); void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }

@ -1,40 +1,40 @@
#include "selfdrive/ui/replay/route.h" #include "selfdrive/ui/replay/route.h"
#include <QEventLoop> #include <QEventLoop>
#include <QFile>
#include <QJsonArray> #include <QJsonArray>
#include <QJsonDocument> #include <QJsonDocument>
#include <QRegExp> #include <QRegExp>
#include <QThread> #include <QtConcurrent>
#include <future>
#include "selfdrive/hardware/hw.h" #include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h" #include "selfdrive/ui/qt/api.h"
#include "selfdrive/ui/replay/util.h" #include "selfdrive/ui/replay/util.h"
Route::Route(const QString &route, const QString &data_dir) : route_(route), data_dir_(data_dir) {}
bool Route::load() { bool Route::load() {
if (data_dir_.isEmpty()) { if (data_dir_.isEmpty()) {
QEventLoop loop; return loadFromServer();
auto onError = [&loop](const QString &err) { loop.quit(); };
bool ret = false;
HttpRequest http(nullptr, !Hardware::PC());
QObject::connect(&http, &HttpRequest::failedResponse, onError);
QObject::connect(&http, &HttpRequest::timeoutResponse, onError);
QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) {
ret = loadFromJson(json);
loop.quit();
});
http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files");
loop.exec();
return ret;
} else { } else {
return loadFromLocal(); return loadFromLocal();
} }
} }
bool Route::loadFromServer() {
QEventLoop loop;
auto onError = [&loop](const QString &err) { loop.quit(); };
bool ret = false;
HttpRequest http(nullptr, !Hardware::PC());
QObject::connect(&http, &HttpRequest::failedResponse, onError);
QObject::connect(&http, &HttpRequest::timeoutResponse, onError);
QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) {
ret = loadFromJson(json);
loop.quit();
});
http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files");
loop.exec();
return ret;
}
bool Route::loadFromJson(const QString &json) { bool Route::loadFromJson(const QString &json) {
QJsonObject route_files = QJsonDocument::fromJson(json.trimmed().toUtf8()).object(); QJsonObject route_files = QJsonDocument::fromJson(json.trimmed().toUtf8()).object();
if (route_files.empty()) { if (route_files.empty()) {
@ -47,23 +47,7 @@ bool Route::loadFromJson(const QString &json) {
for (const auto &url : route_files[key].toArray()) { for (const auto &url : route_files[key].toArray()) {
QString url_str = url.toString(); QString url_str = url.toString();
if (rx.indexIn(url_str) != -1) { if (rx.indexIn(url_str) != -1) {
const int seg_num = rx.cap(1).toInt(); addFileToSegment(rx.cap(1).toInt(), url_str);
if (segments_.size() <= seg_num) {
segments_.resize(seg_num + 1);
}
if (key == "logs") {
segments_[seg_num].rlog = url_str;
} else if (key == "qlogs") {
segments_[seg_num].qlog = url_str;
} else if (key == "cameras") {
segments_[seg_num].road_cam = url_str;
} else if (key == "dcameras") {
segments_[seg_num].driver_cam = url_str;
} else if (key == "ecameras") {
segments_[seg_num].wide_road_cam = url_str;
} else if (key == "qcameras") {
segments_[seg_num].qcamera = url_str;
}
} }
} }
} }
@ -79,121 +63,94 @@ bool Route::loadFromLocal() {
if (folders.isEmpty()) return false; if (folders.isEmpty()) return false;
for (auto folder : folders) { for (auto folder : folders) {
const int seg_num = folder.split("--")[2].toInt(); int seg_num_pos = folder.lastIndexOf("--");
if (segments_.size() <= seg_num) { if (seg_num_pos != -1) {
segments_.resize(seg_num + 1); const int seg_num = folder.mid(seg_num_pos + 2).toInt();
} QDir segment_dir(log_dir.filePath(folder));
QDir segment_dir(log_dir.filePath(folder)); for (auto f : segment_dir.entryList(QDir::Files)) {
for (auto f : segment_dir.entryList(QDir::Files)) { addFileToSegment(seg_num, segment_dir.absoluteFilePath(f));
const QString file_path = segment_dir.absoluteFilePath(f);
if (f.startsWith("rlog")) {
segments_[seg_num].rlog = file_path;
} else if (f.startsWith("qlog")) {
segments_[seg_num].qlog = file_path;
} else if (f.startsWith("fcamera")) {
segments_[seg_num].road_cam = file_path;
} else if (f.startsWith("dcamera")) {
segments_[seg_num].driver_cam = file_path;
} else if (f.startsWith("ecamera")) {
segments_[seg_num].wide_road_cam = file_path;
} else if (f.startsWith("qcamera")) {
segments_[seg_num].qcamera = file_path;
} }
} }
} }
return true; return true;
} }
void Route::addFileToSegment(int n, const QString &file) {
const QString name = QUrl(file).fileName();
if (name == "rlog.bz2") {
segments_[n].rlog = file;
} else if (name == "qlog.bz2") {
segments_[n].qlog = file;
} else if (name == "fcamera.hevc") {
segments_[n].road_cam = file;
} else if (name == "dcamera.hevc") {
segments_[n].driver_cam = file;
} else if (name == "ecamera.hevc") {
segments_[n].wide_road_cam = file;
} else if (name == "qcamera.ts") {
segments_[n].qcamera = file;
}
}
// class Segment // class Segment
Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) { Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) {
static std::once_flag once_flag; static std::once_flag once_flag;
std::call_once(once_flag, [=]() { std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); });
if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath());
}); // the order is [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const QString file_list[] = {
// fallback to qcamera/qlog files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam; load_dcam ? files.driver_cam : "",
log_path_ = files_.rlog.isEmpty() ? files_.qlog : files_.rlog; load_ecam ? files.wide_road_cam : "",
assert (!log_path_.isEmpty() && !road_cam_path_.isEmpty()); files.rlog.isEmpty() ? files.qlog : files.rlog,
};
if (!load_dcam) { for (int i = 0; i < std::size(file_list); i++) {
files_.driver_cam = ""; if (!file_list[i].isEmpty()) {
} loading_++;
if (!load_ecam) { synchronizer_.addFuture(QtConcurrent::run(this, &Segment::loadFile, i, file_list[i].toStdString()));
files_.wide_road_cam = "";
}
if (!QUrl(log_path_).isLocalFile()) {
for (auto &url : {log_path_, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) {
if (!url.isEmpty() && !QFile::exists(localPath(url))) {
downloadFile(url);
++downloading_;
}
} }
} }
if (downloading_ == 0) {
QTimer::singleShot(0, this, &Segment::load);
} else {
qDebug() << "downloading segment" << seg_num_ << "...";
}
} }
Segment::~Segment() { Segment::~Segment() {
aborting_ = true; aborting_ = true;
if (downloading_ > 0) { synchronizer_.setCancelOnWait(true);
qDebug() << "cancel download segment" << seg_num_; synchronizer_.waitForFinished();
}
for (auto &t : download_threads_) {
if (t->isRunning()) t->wait();
}
} }
void Segment::downloadFile(const QString &url) { void Segment::loadFile(int id, const std::string file) {
download_threads_.emplace_back(QThread::create([=]() { const bool is_remote = file.find("https://") == 0;
const std::string local_file = localPath(url).toStdString(); const std::string local_file = is_remote ? cacheFilePath(file) : file;
bool ret = httpMultiPartDownload(url.toStdString(), local_file, connections_per_file, &aborting_); bool file_ready = util::file_exists(local_file);
if (ret && url == log_path_) {
// pre-decompress log file. if (!file_ready && is_remote) {
std::ofstream ostrm(local_file + "_decompressed", std::ios::binary); // TODO: retry on failure
readBZ2File(local_file, ostrm); file_ready = httpMultiPartDownload(file, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
} }
if (--downloading_ == 0 && !aborting_) {
load();
}
}))->start();
}
// load concurrency if (!aborting_ && file_ready) {
void Segment::load() { if (id < MAX_CAMERAS) {
std::vector<std::future<bool>> futures; frames[id] = std::make_unique<FrameReader>();
success_ = success_ && frames[id]->load(local_file);
futures.emplace_back(std::async(std::launch::async, [=]() { } else {
const std::string bzip_file = localPath(log_path_).toStdString(); std::string decompressed = cacheFilePath(local_file + ".decompressed");
const std::string decompressed_file = bzip_file + "_decompressed"; if (!util::file_exists(decompressed)) {
bool is_bzip = !util::file_exists(decompressed_file); std::ofstream ostrm(decompressed, std::ios::binary);
log = std::make_unique<LogReader>(); readBZ2File(local_file, ostrm);
return log->load(is_bzip ? bzip_file : decompressed_file, is_bzip); }
})); log = std::make_unique<LogReader>();
success_ = success_ && log->load(decompressed);
QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam};
for (int i = 0; i < std::size(camera_files); ++i) {
if (!camera_files[i].isEmpty()) {
futures.emplace_back(std::async(std::launch::async, [=]() {
frames[i] = std::make_unique<FrameReader>();
return frames[i]->load(localPath(camera_files[i]).toStdString());
}));
} }
} }
int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; }); if (!aborting_ && --loading_ == 0) {
loaded_ = (success_cnt == futures.size()); emit loadFinished(success_);
emit loadFinished(); }
} }
QString Segment::localPath(const QUrl &url) { std::string Segment::cacheFilePath(const std::string &file) {
if (url.isLocalFile() || QFile(url.toString()).exists()) return url.toString(); QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery);
QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex();
QByteArray url_no_query = url.toString(QUrl::RemoveQuery).toUtf8(); return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString();
return CACHE_DIR.filePath(QString(QCryptographicHash::hash(url_no_query, QCryptographicHash::Sha256).toHex()));
} }

@ -1,16 +1,13 @@
#pragma once #pragma once
#include <QDir> #include <QDir>
#include <QObject> #include <QFutureSynchronizer>
#include <QString>
#include <vector>
#include "selfdrive/common/util.h" #include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/logreader.h" #include "selfdrive/ui/replay/logreader.h"
const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str()); const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str());
const int connections_per_file = 3;
struct SegmentFile { struct SegmentFile {
QString rlog; QString rlog;
@ -23,45 +20,42 @@ struct SegmentFile {
class Route { class Route {
public: public:
Route(const QString &route, const QString &data_dir = {}); Route(const QString &route, const QString &data_dir = {}) : route_(route), data_dir_(data_dir) {};
bool load(); bool load();
inline const QString &name() const { return route_; }; inline const QString &name() const { return route_; };
inline int size() const { return segments_.size(); } inline const std::map<int, SegmentFile> &segments() const { return segments_; }
inline SegmentFile &at(int n) { return segments_[n]; } inline const SegmentFile &at(int n) { return segments_.at(n); }
protected: protected:
bool loadFromLocal(); bool loadFromLocal();
bool loadFromServer();
bool loadFromJson(const QString &json); bool loadFromJson(const QString &json);
void addFileToSegment(int seg_num, const QString &file);
QString route_; QString route_;
QString data_dir_; QString data_dir_;
std::vector<SegmentFile> segments_; std::map<int, SegmentFile> segments_;
}; };
class Segment : public QObject { class Segment : public QObject {
Q_OBJECT Q_OBJECT
public: public:
Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam); Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam);
~Segment(); ~Segment();
inline bool isLoaded() const { return loaded_; } inline bool isLoaded() const { return !loading_ && success_; }
const int seg_num = 0;
std::unique_ptr<LogReader> log; std::unique_ptr<LogReader> log;
std::unique_ptr<FrameReader> frames[MAX_CAMERAS] = {}; std::unique_ptr<FrameReader> frames[MAX_CAMERAS] = {};
signals: signals:
void loadFinished(); void loadFinished(bool success);
protected: protected:
void load(); void loadFile(int id, const std::string file);
void downloadFile(const QString &url); std::string cacheFilePath(const std::string &file);
QString localPath(const QUrl &url);
std::atomic<bool> loaded_ = false; std::atomic<bool> success_ = true, aborting_ = false;
std::atomic<bool> aborting_ = false; std::atomic<int> loading_ = 0;
std::atomic<int> downloading_ = 0; QFutureSynchronizer<void> synchronizer_;
int seg_num_ = 0;
SegmentFile files_;
QString road_cam_path_;
QString log_path_;
std::vector<QThread*> download_threads_;
}; };

@ -14,7 +14,6 @@ std::string sha_256(const QString &dat) {
TEST_CASE("httpMultiPartDownload") { TEST_CASE("httpMultiPartDownload") {
char filename[] = "/tmp/XXXXXX"; char filename[] = "/tmp/XXXXXX";
int fd = mkstemp(filename); int fd = mkstemp(filename);
REQUIRE(fd != -1);
close(fd); close(fd);
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
@ -54,7 +53,7 @@ bool is_events_ordered(const std::vector<Event *> &events) {
TEST_CASE("Segment") { TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE); Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load()); REQUIRE(demo_route.load());
REQUIRE(demo_route.size() == 11); REQUIRE(demo_route.segments().size() == 11);
QEventLoop loop; QEventLoop loop;
Segment segment(0, demo_route.at(0), false, false); Segment segment(0, demo_route.at(0), false, false);
@ -132,7 +131,6 @@ void TestReplay::test_seek() {
segments_.erase(n); segments_.erase(n);
} }
for (int i =0; i < 50; ++i) { for (int i =0; i < 50; ++i) {
testSeekTo(520);
testSeekTo(random_int(4 * 60, 9 * 60)); testSeekTo(random_int(4 * 60, 9 * 60));
} }
loop.quit(); loop.quit();

@ -49,10 +49,12 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
int64_t content_length = getDownloadContentLength(url); int64_t content_length = getDownloadContentLength(url);
if (content_length == -1) return false; if (content_length == -1) return false;
// create a tmp sparse file
std::string tmp_file = target_file + ".tmp"; std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb"); FILE *fp = fopen(tmp_file.c_str(), "wb");
// create a sparse file assert(fp);
fseek(fp, content_length, SEEK_SET); fseek(fp, content_length - 1, SEEK_SET);
fwrite("\0", 1, 1, fp);
CURLM *cm = curl_multi_init(); CURLM *cm = curl_multi_init();
std::map<CURL *, MultiPartWriter> writers; std::map<CURL *, MultiPartWriter> writers;

Loading…
Cancel
Save