diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 62f93c13a9..ed057bf54b 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -115,10 +115,10 @@ if GetOption('extras'): if arch in ['x86_64', 'Darwin'] or GetOption('extras'): qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] + replay_lib_src = ["replay/replay.cc", "replay/consoleui.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) - replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv'] + qt_libs + replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + qt_libs qt_env.Program("replay/replay", ["replay/main.cc"], LIBS=replay_libs) qt_env.Program("watch3", ["watch3.cc"], LIBS=qt_libs + ['common', 'json11', 'zmq', 'visionipc', 'messaging']) diff --git a/selfdrive/ui/replay/camera.cc b/selfdrive/ui/replay/camera.cc index 1912c9b581..9bcc00583e 100644 --- a/selfdrive/ui/replay/camera.cc +++ b/selfdrive/ui/replay/camera.cc @@ -1,7 +1,7 @@ #include "selfdrive/ui/replay/camera.h" +#include "selfdrive/ui/replay/util.h" #include -#include CameraServer::CameraServer(std::pair camera_size[MAX_CAMERAS], bool send_yuv) : send_yuv(send_yuv) { for (int i = 0; i < MAX_CAMERAS; ++i) { @@ -24,7 +24,7 @@ void CameraServer::startVipcServer() { vipc_server_.reset(new VisionIpcServer("camerad")); for (auto &cam : cameras_) { if (cam.width > 0 && cam.height > 0) { - std::cout << "camera[" << cam.type << "] frame size " << cam.width << "x" << cam.height << std::endl; + rInfo("camera[%d] frame size %dx%d", cam.type, cam.width, cam.height); vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height); if (send_yuv) { vipc_server_->create_buffers(cam.yuv_type, YUV_BUFFER_COUNT, false, cam.width, cam.height); @@ -61,7 +61,7 @@ void CameraServer::cameraThread(Camera &cam) { if (rgb) vipc_server_->send(rgb, &extra, false); if (yuv) vipc_server_->send(yuv, &extra, false); } else { - std::cout << "camera[" << cam.type << "] failed to get frame:" << eidx.getSegmentId() << std::endl; + rError("camera[%d] failed to get frame:", cam.type, eidx.getSegmentId()); } cam.cached_id = id + 1; diff --git a/selfdrive/ui/replay/consoleui.cc b/selfdrive/ui/replay/consoleui.cc new file mode 100644 index 0000000000..fbb9439c90 --- /dev/null +++ b/selfdrive/ui/replay/consoleui.cc @@ -0,0 +1,353 @@ +#include "selfdrive/ui/replay/consoleui.h" + +#include +#include + +#include "selfdrive/common/version.h" + +namespace { + +const int BORDER_SIZE = 3; + +const std::initializer_list> keyboard_shortcuts[] = { + { + {"s", "+10s"}, + {"shift+s", "-10s"}, + {"m", "+60s"}, + {"shift+m", "-60s"}, + {"p", "Pause/Resume"}, + {"e", "Next Engagement"}, + {"d", "Next Disengagement"}, + }, + { + {"enter", "Enter seek request"}, + {"x", "+/-Replay speed"}, + {"q", "Exit"}, + }, +}; + +enum Color { + Default, + Debug, + Yellow, + Green, + Red, + BrightWhite, + Engaged, + Disengaged, +}; + +void add_str(WINDOW *w, const char *str, Color color = Color::Default, bool bold = false) { + if (color != Color::Default) wattron(w, COLOR_PAIR(color)); + if (bold) wattron(w, A_BOLD); + waddstr(w, str); + if (bold) wattroff(w, A_BOLD); + if (color != Color::Default) wattroff(w, COLOR_PAIR(color)); +} + +std::string format_seconds(int s) { + int total_minutes = s / 60; + int seconds = s % 60; + int hours = total_minutes / 60; + int minutes = total_minutes % 60; + return util::string_format("%02d:%02d:%02d", hours, minutes, seconds); +} + +} // namespace + +ConsoleUI::ConsoleUI(Replay *replay, QObject *parent) : replay(replay), sm({"carState", "liveParameters"}), QObject(parent) { + // Initialize curses + initscr(); + clear(); + curs_set(false); + cbreak(); // Line buffering disabled. pass on everything + noecho(); + keypad(stdscr, true); + nodelay(stdscr, true); // non-blocking getchar() + + // Initialize all the colors. https://www.ditig.com/256-colors-cheat-sheet + start_color(); + init_pair(Color::Debug, 246, COLOR_BLACK); // #949494 + init_pair(Color::Yellow, 184, COLOR_BLACK); + init_pair(Color::Red, COLOR_RED, COLOR_BLACK); + init_pair(Color::BrightWhite, 15, COLOR_BLACK); + init_pair(Color::Disengaged, COLOR_BLUE, COLOR_BLUE); + init_pair(Color::Engaged, 28, 28); + init_pair(Color::Green, 34, COLOR_BLACK); + + initWindows(); + + qRegisterMetaType("uint64_t"); + qRegisterMetaType("ReplyMsgType"); + installMessageHandler([this](ReplyMsgType type, const std::string msg) { + emit logMessageSignal(type, QString::fromStdString(msg)); + }); + installDownloadProgressHandler([this](uint64_t cur, uint64_t total, bool success) { + emit updateProgressBarSignal(cur, total, success); + }); + + QObject::connect(replay, &Replay::streamStarted, this, &ConsoleUI::updateSummary); + QObject::connect(¬ifier, SIGNAL(activated(int)), SLOT(readyRead())); + QObject::connect(this, &ConsoleUI::updateProgressBarSignal, this, &ConsoleUI::updateProgressBar); + QObject::connect(this, &ConsoleUI::logMessageSignal, this, &ConsoleUI::logMessage); + + sm_timer.callOnTimeout(this, &ConsoleUI::updateStatus); + sm_timer.start(100); + getch_timer.start(1000, this); + readyRead(); +} + +ConsoleUI::~ConsoleUI() { + endwin(); +} + +void ConsoleUI::initWindows() { + getmaxyx(stdscr, max_height, max_width); + w.fill(nullptr); + w[Win::Title] = newwin(1, max_width, 0, 0); + w[Win::Stats] = newwin(2, max_width - 2 * BORDER_SIZE, 2, BORDER_SIZE); + w[Win::Timeline] = newwin(4, max_width - 2 * BORDER_SIZE, 5, BORDER_SIZE); + w[Win::TimelineDesc] = newwin(1, 100, 10, BORDER_SIZE); + w[Win::CarState] = newwin(3, 100, 12, BORDER_SIZE); + w[Win::DownloadBar] = newwin(1, 100, 16, BORDER_SIZE); + if (int log_height = max_height - 27; log_height > 4) { + w[Win::LogBorder] = newwin(log_height, max_width - 2 * (BORDER_SIZE - 1), 17, BORDER_SIZE - 1); + box(w[Win::LogBorder], 0, 0); + w[Win::Log] = newwin(log_height - 2, max_width - 2 * BORDER_SIZE, 18, BORDER_SIZE); + scrollok(w[Win::Log], true); + } + w[Win::Help] = newwin(5, max_width - (2 * BORDER_SIZE), max_height - 6, BORDER_SIZE); + + // set the title bar + wbkgd(w[Win::Title], A_REVERSE); + mvwprintw(w[Win::Title], 0, 3, "openpilot replay %s", COMMA_VERSION); + + // show windows on the real screen + refresh(); + displayTimelineDesc(); + displayHelp(); + updateSummary(); + updateTimeline(); + for (auto win : w) { + if (win) wrefresh(win); + } +} + +void ConsoleUI::timerEvent(QTimerEvent *ev) { + if (ev->timerId() != getch_timer.timerId()) return; + + if (is_term_resized(max_height, max_width)) { + for (auto win : w) { + if (win) delwin(win); + } + endwin(); + clear(); + refresh(); + initWindows(); + rWarning("resize term %dx%d", max_height, max_width); + } + updateTimeline(); +} + +void ConsoleUI::updateStatus() { + auto write_item = [this](int y, int x, const char *key, const std::string &value, const char *unit, + bool bold = false, Color color = Color::BrightWhite) { + auto win = w[Win::CarState]; + wmove(win, y, x); + add_str(win, key); + add_str(win, value.c_str(), color, bold); + add_str(win, unit); + }; + static const std::pair status_text[] = { + {"loading...", Color::Red}, + {"playing", Color::Green}, + {"paused...", Color::Yellow}, + }; + + sm.update(0); + + if (status != Status::Paused) { + status = (sm.updated("carState") || sm.updated("liveParameters")) ? Status::Playing : Status::Waiting; + } + auto [status_str, status_color] = status_text[status]; + write_item(0, 0, "STATUS: ", status_str, " ", false, status_color); + std::string suffix = util::string_format(" / %s [%d/%d] ", format_seconds(replay->totalSeconds()).c_str(), + replay->currentSeconds() / 60, replay->route()->segments().size()); + write_item(0, 25, "TIME: ", format_seconds(replay->currentSeconds()), suffix.c_str(), true); + + auto p = sm["liveParameters"].getLiveParameters(); + write_item(1, 0, "STIFFNESS: ", util::string_format("%.2f %%", p.getStiffnessFactor() * 100), " "); + write_item(1, 25, "SPEED: ", util::string_format("%.2f", sm["carState"].getCarState().getVEgo()), " m/s"); + write_item(2, 0, "STEER RATIO: ", util::string_format("%.2f", p.getSteerRatio()), ""); + auto angle_offsets = util::string_format("%.2f|%.2f", p.getAngleOffsetAverageDeg(), p.getAngleOffsetDeg()); + write_item(2, 25, "ANGLE OFFSET(AVG|INSTANT): ", angle_offsets, " deg"); + + wrefresh(w[Win::CarState]); +} + +void ConsoleUI::displayHelp() { + for (int i = 0; i < std::size(keyboard_shortcuts); ++i) { + wmove(w[Win::Help], i * 2, 0); + for (auto &[key, desc] : keyboard_shortcuts[i]) { + wattron(w[Win::Help], A_REVERSE); + waddstr(w[Win::Help], (' ' + key + ' ').c_str()); + wattroff(w[Win::Help], A_REVERSE); + waddstr(w[Win::Help], (' ' + desc + ' ').c_str()); + } + } + wrefresh(w[Win::Help]); +} + +void ConsoleUI::displayTimelineDesc() { + std::tuple indicators[]{ + {Color::Engaged, " Engaged ", false}, + {Color::Disengaged, " Disengaged ", false}, + {Color::Green, " Info ", true}, + {Color::Yellow, " Warning ", true}, + {Color::Red, " Critical ", true}, + }; + for (auto [color, name, bold] : indicators) { + add_str(w[Win::TimelineDesc], "__", color, bold); + add_str(w[Win::TimelineDesc], name); + } +} + +void ConsoleUI::logMessage(ReplyMsgType type, const QString &msg) { + if (auto win = w[Win::Log]) { + Color color = Color::Default; + if (type == ReplyMsgType::Debug) { + color = Color::Debug; + } else if (type == ReplyMsgType::Warning) { + color = Color::Yellow; + } else if (type == ReplyMsgType::Critical) { + color = Color::Red; + } + add_str(win, qPrintable(msg + "\n"), color); + wrefresh(win); + } +} + +void ConsoleUI::updateProgressBar(uint64_t cur, uint64_t total, bool success) { + werase(w[Win::DownloadBar]); + if (success && cur < total) { + const int width = 35; + const float progress = cur / (double)total; + const int pos = width * progress; + wprintw(w[Win::DownloadBar], "Downloading [%s>%s] %d%% %s", std::string(pos, '=').c_str(), + std::string(width - pos, ' ').c_str(), int(progress * 100.0), formattedDataSize(total).c_str()); + } + wrefresh(w[Win::DownloadBar]); +} + +void ConsoleUI::updateSummary() { + const auto &route = replay->route(); + mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %d segments", qPrintable(route->name()), route->segments().size()); + mvwprintw(w[Win::Stats], 1, 0, "Car Fingerprint: %s", replay->carFingerprint().c_str()); + wrefresh(w[Win::Stats]); +} + +void ConsoleUI::updateTimeline() { + auto win = w[Win::Timeline]; + int width = getmaxx(win); + werase(win); + + wattron(win, COLOR_PAIR(Color::Disengaged)); + mvwhline(win, 1, 0, ' ', width); + mvwhline(win, 2, 0, ' ', width); + wattroff(win, COLOR_PAIR(Color::Disengaged)); + + const int total_sec = replay->totalSeconds(); + for (auto [begin, end, type] : replay->getTimeline()) { + int start_pos = ((double)begin / total_sec) * width; + int end_pos = ((double)end / total_sec) * width; + if (type == TimelineType::Engaged) { + mvwchgat(win, 1, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL); + mvwchgat(win, 2, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL); + } else { + auto color_id = Color::Green; + if (type != TimelineType::AlertInfo) { + color_id = type == TimelineType::AlertWarning ? Color::Yellow : Color::Red; + } + mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, color_id, NULL); + } + } + + int cur_pos = ((double)replay->currentSeconds() / total_sec) * width; + wattron(win, COLOR_PAIR(Color::BrightWhite)); + mvwaddch(win, 0, cur_pos, ACS_VLINE); + mvwaddch(win, 3, cur_pos, ACS_VLINE); + wattroff(win, COLOR_PAIR(Color::BrightWhite)); + wrefresh(win); +} + +void ConsoleUI::readyRead() { + int c; + while ((c = getch()) != ERR) { + handleKey(c); + } +} + +void ConsoleUI::pauseReplay(bool pause) { + replay->pause(pause); + status = pause ? Status::Paused : Status::Waiting; +} + +void ConsoleUI::handleKey(char c) { + if (c == '\n') { + // pause the replay and blocking getchar() + pauseReplay(true); + updateStatus(); + getch_timer.stop(); + curs_set(true); + nodelay(stdscr, false); + + // Wait for user input + rWarning("Waiting for input..."); + int y = getmaxy(stdscr) - 9; + move(y, BORDER_SIZE); + add_str(stdscr, "Enter seek request: ", Color::BrightWhite, true); + refresh(); + + // Seek to choice + echo(); + int choice = 0; + scanw((char *)"%d", &choice); + noecho(); + pauseReplay(false); + replay->seekTo(choice, false); + + // Clean up and turn off the blocking mode + move(y, 0); + clrtoeol(); + nodelay(stdscr, true); + curs_set(false); + refresh(); + getch_timer.start(1000, this); + + } else if (c == 'x') { + if (replay->hasFlag(REPLAY_FLAG_FULL_SPEED)) { + replay->removeFlag(REPLAY_FLAG_FULL_SPEED); + rWarning("replay at normal speed"); + } else { + replay->addFlag(REPLAY_FLAG_FULL_SPEED); + rWarning("replay at full speed"); + } + } else if (c == 'e') { + replay->seekToFlag(FindFlag::nextEngagement); + } else if (c == 'd') { + replay->seekToFlag(FindFlag::nextDisEngagement); + } else if (c == 'm') { + replay->seekTo(+60, true); + } else if (c == 'M') { + replay->seekTo(-60, true); + } else if (c == 's') { + replay->seekTo(+10, true); + } else if (c == 'S') { + replay->seekTo(-10, true); + } else if (c == ' ') { + pauseReplay(!replay->isPaused()); + } else if (c == 'q' || c == 'Q') { + replay->stop(); + qApp->exit(); + } +} diff --git a/selfdrive/ui/replay/consoleui.h b/selfdrive/ui/replay/consoleui.h new file mode 100644 index 0000000000..bce1146d46 --- /dev/null +++ b/selfdrive/ui/replay/consoleui.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "selfdrive/ui/replay/replay.h" +#include + +class ConsoleUI : public QObject { + Q_OBJECT + +public: + ConsoleUI(Replay *replay, QObject *parent = 0); + ~ConsoleUI(); + +private: + void initWindows(); + void handleKey(char c); + void displayHelp(); + void displayTimelineDesc(); + void updateTimeline(); + void updateSummary(); + void updateStatus(); + void pauseReplay(bool pause); + + enum Status { Waiting, Playing, Paused }; + enum Win { Title, Stats, Log, LogBorder, DownloadBar, Timeline, TimelineDesc, Help, CarState, Max}; + std::array w{}; + SubMaster sm; + Replay *replay; + QBasicTimer getch_timer; + QTimer sm_timer; + QSocketNotifier notifier{0, QSocketNotifier::Read, this}; + int max_width, max_height; + Status status = Status::Waiting; + +signals: + void updateProgressBarSignal(uint64_t cur, uint64_t total, bool success); + void logMessageSignal(ReplyMsgType type, const QString &msg); + +private slots: + void readyRead(); + void timerEvent(QTimerEvent *ev); + void updateProgressBar(uint64_t cur, uint64_t total, bool success); + void logMessage(ReplyMsgType type, const QString &msg); +}; diff --git a/selfdrive/ui/replay/filereader.cc b/selfdrive/ui/replay/filereader.cc index 1585bb42d2..b57a62d501 100644 --- a/selfdrive/ui/replay/filereader.cc +++ b/selfdrive/ui/replay/filereader.cc @@ -1,7 +1,6 @@ #include "selfdrive/ui/replay/filereader.h" #include -#include #include "selfdrive/common/util.h" #include "selfdrive/ui/replay/util.h" @@ -35,13 +34,12 @@ std::string FileReader::read(const std::string &file, std::atomic *abort) std::string FileReader::download(const std::string &url, std::atomic *abort) { for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) { + if (i > 0) rWarning("download failed, retrying %d", i); + std::string result = httpGet(url, chunk_size_, abort); if (!result.empty()) { return result; } - if (i != max_retries_ && !(abort && *abort)) { - std::cout << "download failed, retrying " << i + 1 << std::endl; - } } return {}; } diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index 59e43cb58f..6c88f2cab5 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -1,4 +1,5 @@ #include "selfdrive/ui/replay/framereader.h" +#include "selfdrive/ui/replay/util.h" #include #include "libyuv.h" @@ -29,7 +30,7 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat * for (const enum AVPixelFormat *p = pix_fmts; *p != -1; p++) { if (*p == *hw_pix_fmt) return *p; } - printf("Please run replay with the --no-cuda flag!\n"); + rWarning("Please run replay with the --no-cuda flag!"); // fallback to YUV420p *hw_pix_fmt = AV_PIX_FMT_NONE; return AV_PIX_FMT_YUV420P; @@ -37,7 +38,9 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat * } // namespace -FrameReader::FrameReader() {} +FrameReader::FrameReader() { + av_log_set_level(AV_LOG_QUIET); +} FrameReader::~FrameReader() { for (AVPacket *pkt : packets) { @@ -81,13 +84,13 @@ bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::at if (ret != 0) { char err_str[1024] = {0}; av_strerror(ret, err_str, std::size(err_str)); - printf("Error loading video - %s\n", err_str); + rWarning("Error loading video - %s", err_str); return false; } ret = avformat_find_stream_info(input_ctx, nullptr); if (ret < 0) { - printf("cannot find a video stream in the input file\n"); + rWarning("cannot find a video stream in the input file"); return false; } @@ -105,7 +108,7 @@ bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::at if (has_cuda_device && !no_cuda) { if (!initHardwareDecoder(AV_HWDEVICE_TYPE_CUDA)) { - printf("No CUDA capable device was found. fallback to CPU decoding.\n"); + rWarning("No CUDA capable device was found. fallback to CPU decoding."); } else { nv12toyuv_buffer.resize(getYUVSize()); } @@ -135,8 +138,8 @@ bool FrameReader::initHardwareDecoder(AVHWDeviceType hw_device_type) { for (int i = 0;; i++) { const AVCodecHWConfig *config = avcodec_get_hw_config(decoder_ctx->codec, i); if (!config) { - printf("decoder %s does not support hw device type %s.\n", - decoder_ctx->codec->name, av_hwdevice_get_type_name(hw_device_type)); + rWarning("decoder %s does not support hw device type %s.", decoder_ctx->codec->name, + av_hwdevice_get_type_name(hw_device_type)); return false; } if (config->methods & AV_CODEC_HW_CONFIG_METHOD_HW_DEVICE_CTX && config->device_type == hw_device_type) { @@ -149,7 +152,7 @@ bool FrameReader::initHardwareDecoder(AVHWDeviceType hw_device_type) { if (ret < 0) { hw_pix_fmt = AV_PIX_FMT_NONE; has_cuda_device = false; - printf("Failed to create specified HW device %d.\n", ret); + rWarning("Failed to create specified HW device %d.", ret); return false; } @@ -192,7 +195,7 @@ bool FrameReader::decode(int idx, uint8_t *rgb, uint8_t *yuv) { AVFrame *FrameReader::decodeFrame(AVPacket *pkt) { int ret = avcodec_send_packet(decoder_ctx, pkt); if (ret < 0) { - printf("Error sending a packet for decoding\n"); + rWarning("Error sending a packet for decoding"); return nullptr; } @@ -205,7 +208,7 @@ AVFrame *FrameReader::decodeFrame(AVPacket *pkt) { if (av_frame_->format == hw_pix_fmt) { hw_frame.reset(av_frame_alloc()); if ((ret = av_hwframe_transfer_data(hw_frame.get(), av_frame_.get(), 0)) < 0) { - printf("error transferring the data from GPU to CPU\n"); + rWarning("error transferring the data from GPU to CPU"); return nullptr; } return hw_frame.get(); diff --git a/selfdrive/ui/replay/logreader.cc b/selfdrive/ui/replay/logreader.cc index d836ef11f8..579fe50644 100644 --- a/selfdrive/ui/replay/logreader.cc +++ b/selfdrive/ui/replay/logreader.cc @@ -1,7 +1,6 @@ #include "selfdrive/ui/replay/logreader.h" #include -#include #include "selfdrive/ui/replay/util.h" Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { @@ -59,7 +58,7 @@ bool LogReader::load(const std::byte *data, size_t size, std::atomic *abor raw_ = decompressBZ2(data, size, abort); if (raw_.empty()) { if (!(abort && *abort)) { - std::cout << "failed to decompress log" << std::endl; + rWarning("failed to decompress log"); } return false; } @@ -92,9 +91,9 @@ bool LogReader::load(const std::byte *data, size_t size, std::atomic *abor events.push_back(evt); } } catch (const kj::Exception &e) { - std::cout << "failed to parse log : " << e.getDescription().cStr() << std::endl; + rWarning("failed to parse log : %s", e.getDescription().cStr()); if (!events.empty()) { - std::cout << "read " << events.size() << " events from corrupt log" << std::endl; + rWarning("read %zu events from corrupt log", events.size()); } } diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 180aecc7e1..a7d2f54042 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -1,109 +1,13 @@ -#include - #include #include -#include -#include -#include -#include +#include "selfdrive/ui/replay/consoleui.h" #include "selfdrive/ui/replay/replay.h" const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; -struct termios oldt = {}; -Replay *replay = nullptr; - -void sigHandler(int s) { - std::signal(s, SIG_DFL); - if (oldt.c_lflag) { - tcsetattr(STDIN_FILENO, TCSANOW, &oldt); - } - replay->stop(); - qApp->quit(); -} - -int getch() { - int ch; - struct termios newt; - - tcgetattr(STDIN_FILENO, &oldt); - newt = oldt; - newt.c_lflag &= ~(ICANON | ECHO); - - tcsetattr(STDIN_FILENO, TCSANOW, &newt); - ch = getchar(); - tcsetattr(STDIN_FILENO, TCSANOW, &oldt); - - return ch; -} - -void keyboardThread(Replay *replay_) { - char c; - while (true) { - c = getch(); - if (c == '\n') { - printf("Enter seek request: "); - std::string r; - std::cin >> r; - - try { - if (r[0] == '#') { - r.erase(0, 1); - replay_->seekTo(std::stoi(r) * 60, false); - } else { - replay_->seekTo(std::stoi(r), false); - } - } catch (std::invalid_argument) { - qDebug() << "invalid argument"; - } - getch(); // remove \n from entering seek - } else if (c == 'e') { - replay_->seekToFlag(FindFlag::nextEngagement); - } else if (c == 'd') { - replay_->seekToFlag(FindFlag::nextDisEngagement); - } else if (c == 'm') { - replay_->seekTo(+60, true); - } else if (c == 'M') { - replay_->seekTo(-60, true); - } else if (c == 's') { - replay_->seekTo(+10, true); - } else if (c == 'S') { - replay_->seekTo(-10, true); - } else if (c == 'G') { - replay_->seekTo(0, true); - } else if (c == 'x') { - if (replay_->hasFlag(REPLAY_FLAG_FULL_SPEED)) { - replay_->removeFlag(REPLAY_FLAG_FULL_SPEED); - qInfo() << "replay at normal speed"; - } else { - replay_->addFlag(REPLAY_FLAG_FULL_SPEED); - qInfo() << "replay at full speed"; - } - } else if (c == ' ') { - replay_->pause(!replay_->isPaused()); - } - } -} - -void replayMessageOutput(QtMsgType type, const QMessageLogContext &context, const QString &msg) { - QByteArray localMsg = msg.toLocal8Bit(); - if (type == QtDebugMsg) { - std::cout << "\033[38;5;248m" << localMsg.constData() << "\033[00m" << std::endl; - } else if (type == QtWarningMsg) { - std::cout << "\033[38;5;227m" << localMsg.constData() << "\033[00m" << std::endl; - } else if (type == QtCriticalMsg) { - std::cout << "\033[38;5;196m" << localMsg.constData() << "\033[00m" << std::endl; - } else { - std::cout << localMsg.constData() << std::endl; - } -} int main(int argc, char *argv[]) { - qInstallMessageHandler(replayMessageOutput); - QApplication app(argc, argv); - std::signal(SIGINT, sigHandler); - std::signal(SIGTERM, sigHandler); const std::tuple flags[] = { {"dcam", REPLAY_FLAG_DCAM, "load driver camera"}, @@ -145,16 +49,12 @@ int main(int argc, char *argv[]) { replay_flags |= flag; } } - replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app); + Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app); if (!replay->load()) { return 0; } - replay->start(parser.value("start").toInt()); - // start keyboard control thread - QThread *t = new QThread(); - QObject::connect(t, &QThread::started, [=]() { keyboardThread(replay); }); - QObject::connect(t, &QThread::finished, t, &QThread::deleteLater); - t->start(); + ConsoleUI console_ui(replay); + replay->start(parser.value("start").toInt()); return app.exec(); } diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 98e211fb53..0bfcdbe2b7 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -1,7 +1,7 @@ #include "selfdrive/ui/replay/replay.h" -#include #include +#include #include #include "cereal/services.h" @@ -23,6 +23,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s } } qDebug() << "services " << s; + qDebug() << "loading route " << route; if (sm == nullptr) { pm = std::make_unique(s); @@ -33,19 +34,17 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s qRegisterMetaType("FindFlag"); connect(this, &Replay::seekTo, this, &Replay::doSeek); - connect(this, &Replay::seekToFlag, this, &Replay::doSeekToFlag); - connect(this, &Replay::stop, this, &Replay::doStop); connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); } Replay::~Replay() { - doStop(); + stop(); } -void Replay::doStop() { +void Replay::stop() { if (!stream_thread_ && segments_.empty()) return; - qDebug() << "shutdown: in progress..."; + rInfo("shutdown: in progress..."); if (stream_thread_ != nullptr) { exit_ = updating_events_ = true; stream_cv_.notify_one(); @@ -55,7 +54,8 @@ void Replay::doStop() { } segments_.clear(); camera_server_.reset(nullptr); - qDebug() << "shutdown: done"; + timeline_future.waitForFinished(); + rInfo("shutdown: done"); } bool Replay::load() { @@ -75,7 +75,7 @@ bool Replay::load() { qCritical() << "no valid segments in route" << route_->name(); return false; } - qInfo() << "load route" << route_->name() << "with" << segments_.size() << "valid segments"; + rInfo("load route %s with %zu valid segments", qPrintable(route_->name()), segments_.size()); return true; } @@ -104,11 +104,11 @@ void Replay::doSeek(int seconds, bool relative) { seconds = std::max(0, seconds); int seg = seconds / 60; if (segments_.find(seg) == segments_.end()) { - qWarning() << "can't seek to" << seconds << "s, segment" << seg << "is invalid"; + rWarning("can't seek to %d s segment %d is invalid", seconds, seg); return true; } - qInfo() << "seeking to" << seconds << "s, segment" << seg; + rWarning("seeking to %d s, segment %d", seconds, seg); current_segment_ = seg; cur_mono_time_ = route_start_ts_ + seconds * 1e9; return isSegmentMerged(seg); @@ -116,61 +116,82 @@ void Replay::doSeek(int seconds, bool relative) { queueSegment(); } -void Replay::doSeekToFlag(FindFlag flag) { +void Replay::seekToFlag(FindFlag flag) { if (flag == FindFlag::nextEngagement) { - qInfo() << "seeking to the next engagement..."; + rWarning("seeking to the next engagement..."); } else if (flag == FindFlag::nextDisEngagement) { - qInfo() << "seeking to the disengagement..."; + rWarning("seeking to the disengagement..."); } - updateEvents([&]() { if (auto next = find(flag)) { - uint64_t tm = *next - 2 * 1e9; // seek to 2 seconds before next - if (tm <= cur_mono_time_) { - return true; - } - - cur_mono_time_ = tm; + int tm = *next - 2; // seek to 2 seconds before next + cur_mono_time_ = (route_start_ts_ + tm * 1e9); current_segment_ = currentSeconds() / 60; - return isSegmentMerged(current_segment_); } else { - qWarning() << "seeking failed"; - return true; + rWarning("seeking failed"); } + return isSegmentMerged(current_segment_); }); queueSegment(); } -std::optional Replay::find(FindFlag flag) { - // Search in all segments - for (const auto &[n, _] : segments_) { - if (n < current_segment_) continue; +void Replay::buildTimeline() { + uint64_t engaged_begin = 0; + uint64_t alert_begin = 0; + TimelineType alert_type = TimelineType::None; + for (int i = 0; i < segments_.size() && !exit_; ++i) { LogReader log; - bool cache_to_local = true; // cache qlog to local for fast seek - if (!log.load(route_->at(n).qlog.toStdString(), nullptr, cache_to_local, 0, 3)) continue; + if (!log.load(route_->at(i).qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; for (const Event *e : log.events) { - if (e->mono_time > cur_mono_time_ && e->which == cereal::Event::Which::CONTROLS_STATE) { - const auto cs = e->event.getControlsState(); - if (flag == FindFlag::nextEngagement && cs.getEnabled()) { - return e->mono_time; - } else if (flag == FindFlag::nextDisEngagement && !cs.getEnabled()) { - return e->mono_time; + if (e->which == cereal::Event::Which::CONTROLS_STATE) { + auto cs = e->event.getControlsState(); + + if (!engaged_begin && cs.getEnabled()) { + engaged_begin = e->mono_time; + } else if (engaged_begin && !cs.getEnabled()) { + std::lock_guard lk(timeline_lock); + timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged}); + engaged_begin = 0; + } + + if (!alert_begin && cs.getAlertType().size() > 0) { + alert_begin = e->mono_time; + alert_type = TimelineType::AlertInfo; + if (cs.getAlertStatus() != cereal::ControlsState::AlertStatus::NORMAL) { + alert_type = cs.getAlertStatus() == cereal::ControlsState::AlertStatus::USER_PROMPT + ? TimelineType::AlertWarning + : TimelineType::AlertCritical; + } + } else if (alert_begin && cs.getAlertType().size() == 0) { + std::lock_guard lk(timeline_lock); + timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), alert_type}); + alert_begin = 0; } } } } +} + +std::optional Replay::find(FindFlag flag) { + int cur_ts = currentSeconds(); + for (auto [start_ts, end_ts, type] : getTimeline()) { + if (type == TimelineType::Engaged) { + if (flag == FindFlag::nextEngagement && start_ts > cur_ts) { + return start_ts; + } else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) { + return end_ts; + } + } + } return std::nullopt; } void Replay::pause(bool pause) { updateEvents([=]() { - qInfo() << (pause ? "paused..." : "resuming"); - if (pause) { - qInfo() << "at " << currentSeconds() << "s"; - } + rWarning("%s at %d s", pause ? "paused..." : "resuming", currentSeconds()); paused_ = pause; return true; }); @@ -185,7 +206,7 @@ void Replay::setCurrentSegment(int n) { void Replay::segmentLoadFinished(bool success) { if (!success) { Segment *seg = qobject_cast(sender()); - qWarning() << "failed to load segment " << seg->seg_num << ", removing it from current replay list"; + rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num); segments_.erase(seg->seg_num); } queueSegment(); @@ -201,19 +222,18 @@ void Replay::queueSegment() { } // load one segment at a time for (auto it = cur; it != end; ++it) { - if (!it->second) { - if (it == cur || std::prev(it)->second->isLoaded()) { - auto &[n, seg] = *it; + auto &[n, seg] = *it; + if ((seg && !seg->isLoaded()) || !seg) { + if (!seg) { + rDebug("loading segment %d...", n); seg = std::make_unique(n, route_->at(n), flags_); QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); - qDebug() << "loading segment" << n << "..."; } break; } } - const auto &cur_segment = cur->second; - enableHttpLogging(!cur_segment->isLoaded()); + const auto &cur_segment = cur->second; // merge the previous adjacent segment if it's loaded auto begin = segments_.find(cur_segment->seg_num - 1); if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) { @@ -228,6 +248,7 @@ void Replay::queueSegment() { // start stream thread if (stream_thread_ == nullptr && cur_segment->isLoaded()) { startStream(cur_segment.get()); + emit streamStarted(); } } @@ -235,13 +256,18 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: // merge 3 segments in sequence. std::vector segments_need_merge; size_t new_events_size = 0; - for (auto it = begin; it != end && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) { + for (auto it = begin; it != end && it->second && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) { segments_need_merge.push_back(it->first); new_events_size += it->second->log->events.size(); } if (segments_need_merge != segments_merged_) { - qDebug() << "merge segments" << segments_need_merge; + std::string s; + for (int i = 0; i < segments_need_merge.size(); ++i) { + s += std::to_string(segments_need_merge[i]); + if (i != segments_need_merge.size() - 1) s += ", "; + } + rDebug("merge segments %s", s.c_str()); new_events_->clear(); new_events_->reserve(new_events_size); for (int n : segments_need_merge) { @@ -269,10 +295,11 @@ void Replay::startStream(const Segment *cur_segment) { // write CarParams it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); if (it != events.end()) { + car_fingerprint_ = (*it)->event.getCarParams().getCarFingerprint(); auto bytes = (*it)->bytes(); Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); } else { - qWarning() << "failed to read CarParams from current segment"; + rWarning("failed to read CarParams from current segment"); } // start camera server @@ -291,6 +318,8 @@ void Replay::startStream(const Segment *cur_segment) { QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); }); QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater); stream_thread_->start(); + + timeline_future = QtConcurrent::run(this, &Replay::buildTimeline); } void Replay::publishMessage(const Event *e) { @@ -298,7 +327,7 @@ void Replay::publishMessage(const Event *e) { auto bytes = e->bytes(); int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); if (ret == -1) { - qDebug() << "stop publishing" << sockets_[e->which] << "due to multiple publishers error"; + rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); sockets_[e->which] = nullptr; } } else { @@ -324,9 +353,7 @@ void Replay::publishFrame(const Event *e) { } void Replay::stream() { - float last_print = 0; cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; - std::unique_lock lk(stream_lock_); while (true) { @@ -337,7 +364,7 @@ void Replay::stream() { Event cur_event(cur_which, cur_mono_time_); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); if (eit == events_->end()) { - qDebug() << "waiting for events..."; + rInfo("waiting for events..."); continue; } @@ -348,12 +375,7 @@ void Replay::stream() { const Event *evt = (*eit); cur_which = evt->which; cur_mono_time_ = evt->mono_time; - const int current_ts = currentSeconds(); - if (last_print > current_ts || (current_ts - last_print) > 5.0) { - last_print = current_ts; - qInfo() << "at " << current_ts << "s"; - } - setCurrentSegment(current_ts / 60); + setCurrentSegment(toSeconds(cur_mono_time_) / 60); // migration for pandaState -> pandaStates to keep UI working for old segments if (cur_which == cereal::Event::Which::PANDA_STATE_D_E_P_R_E_C_A_T_E_D) { @@ -396,7 +418,7 @@ void Replay::stream() { if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) { int last_segment = segments_.rbegin()->first; if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { - qInfo() << "reaches the end of route, restart from beginning"; + rInfo("reaches the end of route, restart from beginning"); emit seekTo(0, false); } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 4f97990506..5f61dc357d 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -28,6 +28,8 @@ enum class FindFlag { nextDisEngagement }; +enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical }; + class Replay : public QObject { Q_OBJECT @@ -37,23 +39,31 @@ public: ~Replay(); bool load(); void start(int seconds = 0); + void stop(); void pause(bool pause); - bool isPaused() const { return paused_; } + void seekToFlag(FindFlag flag); + inline bool isPaused() const { return paused_; } inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; } inline void addFlag(REPLAY_FLAGS flag) { flags_ |= flag; } inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; } + inline const Route* route() const { return route_.get(); } + inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } + inline int toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; } + inline int totalSeconds() const { return segments_.size() * 60; } + inline const std::string &carFingerprint() const { return car_fingerprint_; } + inline const std::vector> getTimeline() { + std::lock_guard lk(timeline_lock); + return timeline; + } signals: void segmentChanged(); void seekTo(int seconds, bool relative); - void seekToFlag(FindFlag flag); - void stop(); + void streamStarted(); protected slots: void queueSegment(); - void doStop(); void doSeek(int seconds, bool relative); - void doSeekToFlag(FindFlag flag); void segmentLoadFinished(bool sucess); protected: @@ -66,7 +76,7 @@ protected: void updateEvents(const std::function& lambda); void publishMessage(const Event *e); void publishFrame(const Event *e); - inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } + void buildTimeline(); inline bool isSegmentMerged(int n) { return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); } @@ -80,7 +90,7 @@ protected: std::atomic current_segment_ = 0; SegmentMap segments_; // the following variables must be protected with stream_lock_ - bool exit_ = false; + std::atomic exit_ = false; bool paused_ = false; bool events_updated_ = false; uint64_t route_start_ts_ = 0; @@ -96,4 +106,9 @@ protected: std::unique_ptr route_; std::unique_ptr camera_server_; std::atomic flags_ = REPLAY_FLAG_NONE; + + std::mutex timeline_lock; + QFuture timeline_future; + std::vector> timeline; + std::string car_fingerprint_; }; diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index fe6e21a91a..e6a6177149 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -26,7 +26,7 @@ RouteIdentifier Route::parseRoute(const QString &str) { bool Route::load() { if (route_.str.isEmpty()) { - qInfo() << "invalid route format"; + rInfo("invalid route format"); return false; } return data_dir_.isEmpty() ? loadFromServer() : loadFromLocal(); diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc index d6eba2e2a8..513cd965be 100644 --- a/selfdrive/ui/replay/util.cc +++ b/selfdrive/ui/replay/util.cc @@ -15,6 +15,40 @@ #include "selfdrive/common/timing.h" #include "selfdrive/common/util.h" +ReplayMessageHandler message_handler = nullptr; +DownloadProgressHandler download_progress_handler = nullptr; + +void installMessageHandler(ReplayMessageHandler handler) { message_handler = handler; } +void installDownloadProgressHandler(DownloadProgressHandler handler) { download_progress_handler = handler; } + +void logMessage(ReplyMsgType type, const char *fmt, ...) { + static std::mutex lock; + std::lock_guard lk(lock); + + char *msg_buf = nullptr; + va_list args; + va_start(args, fmt); + int ret = vasprintf(&msg_buf, fmt, args); + va_end(args); + if (ret <= 0 || !msg_buf) return; + + if (message_handler) { + message_handler(type, msg_buf); + } else { + if (type == ReplyMsgType::Debug) { + std::cout << "\033[38;5;248m" << msg_buf << "\033[00m" << std::endl; + } else if (type == ReplyMsgType::Warning) { + std::cout << "\033[38;5;227m" << msg_buf << "\033[00m" << std::endl; + } else if (type == ReplyMsgType::Critical) { + std::cout << "\033[38;5;196m" << msg_buf << "\033[00m" << std::endl; + } else { + std::cout << msg_buf << std::endl; + } + } + + free(msg_buf); +} + namespace { struct CURLGlobalInitializer { @@ -23,7 +57,6 @@ struct CURLGlobalInitializer { }; static CURLGlobalInitializer curl_initializer; -static std::atomic enable_http_logging = false; template struct MultiPartWriter { @@ -57,6 +90,38 @@ size_t write_cb(char *data, size_t size, size_t count, void *userp) { size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; } +struct DownloadStats { + void add(const std::string &url, uint64_t total_bytes) { + std::lock_guard lk(lock); + items[url] = {0, total_bytes}; + } + + void remove(const std::string &url) { + std::lock_guard lk(lock); + items.erase(url); + } + + void update(const std::string &url, uint64_t downloaded, bool success = true) { + std::lock_guard lk(lock); + items[url].first = downloaded; + + auto stat = std::accumulate(items.begin(), items.end(), std::pair{}, [=](auto &a, auto &b){ + return std::pair{a.first + b.second.first, a.second + b.second.second}; + }); + double tm = millis_since_boot(); + if (download_progress_handler && ((tm - prev_tm) > 500 || !success || stat.first >= stat.second)) { + download_progress_handler(stat.first, stat.second, success); + prev_tm = tm; + } + } + + std::mutex lock; + std::map> items; + double prev_tm = 0; +}; + +} // namespace + std::string formattedDataSize(size_t size) { if (size < 1024) { return std::to_string(size) + " B"; @@ -67,8 +132,6 @@ std::string formattedDataSize(size_t size) { } } -} // namespace - size_t getRemoteFileSize(const std::string &url, std::atomic *abort) { CURL *curl = curl_easy_init(); if (!curl) return -1; @@ -99,12 +162,11 @@ std::string getUrlWithoutQuery(const std::string &url) { return (idx == std::string::npos ? url : url.substr(0, idx)); } -void enableHttpLogging(bool enable) { - enable_http_logging = enable; -} - template bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t content_length, std::atomic *abort) { + static DownloadStats download_stats; + download_stats.add(url, content_length); + int parts = 1; if (chunk_size > 0 && content_length > 10 * 1024 * 1024) { parts = std::nearbyint(content_length / (float)chunk_size); @@ -134,23 +196,11 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont curl_multi_add_handle(cm, eh); } - size_t prev_written = 0; - double last_print = millis_since_boot(); int still_running = 1; while (still_running > 0 && !(abort && *abort)) { curl_multi_wait(cm, nullptr, 0, 1000, nullptr); curl_multi_perform(cm, &still_running); - - if (enable_http_logging) { - if (double ts = millis_since_boot(); (ts - last_print) > 2 * 1000) { - size_t average = (written - prev_written) / ((ts - last_print) / 1000.); - int progress = std::min(100, 100.0 * (double)written / (double)content_length); - std::cout << "downloading " << getUrlWithoutQuery(url) << " - " << progress - << "% (" << formattedDataSize(average) << "/s)" << std::endl; - last_print = ts; - prev_written = written; - } - } + download_stats.update(url, written); } CURLMsg *msg; @@ -164,21 +214,25 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont if (res_status == 206) { complete++; } else { - std::cout << "Download failed: http error code: " << res_status << std::endl; + rWarning("Download failed: http error code: %d", res_status); } } else { - std::cout << "Download failed: connection failure: " << msg->data.result << std::endl; + rWarning("Download failed: connection failure: %d", msg->data.result); } } } + bool success = complete == parts; + download_stats.update(url, written, success); + download_stats.remove(url); + for (const auto &[e, w] : writers) { curl_multi_remove_handle(cm, e); curl_easy_cleanup(e); } curl_multi_cleanup(cm); - return complete == parts; + return success; } std::string httpGet(const std::string &url, size_t chunk_size, std::atomic *abort) { @@ -221,7 +275,7 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic if (bzerror == BZ_OK && prev_write_pos == strm.next_out) { // content is corrupt bzerror = BZ_STREAM_END; - std::cout << "decompressBZ2 error : content is corrupt" << std::endl; + rWarning("decompressBZ2 error : content is corrupt"); break; } diff --git a/selfdrive/ui/replay/util.h b/selfdrive/ui/replay/util.h index cd4b179cdc..6c808095e8 100644 --- a/selfdrive/ui/replay/util.h +++ b/selfdrive/ui/replay/util.h @@ -1,14 +1,34 @@ #pragma once #include +#include #include +enum class ReplyMsgType { + Info, + Debug, + Warning, + Critical +}; + +typedef std::function ReplayMessageHandler; +void installMessageHandler(ReplayMessageHandler); +void logMessage(ReplyMsgType type, const char* fmt, ...); + +#define rInfo(fmt, ...) ::logMessage(ReplyMsgType::Info, fmt, ## __VA_ARGS__) +#define rDebug(fmt, ...) ::logMessage(ReplyMsgType::Debug, fmt, ## __VA_ARGS__) +#define rWarning(fmt, ...) ::logMessage(ReplyMsgType::Warning, fmt, ## __VA_ARGS__) +#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__) + std::string sha256(const std::string &str); void precise_nano_sleep(long sleep_ns); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr); std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic *abort = nullptr); -void enableHttpLogging(bool enable); std::string getUrlWithoutQuery(const std::string &url); size_t getRemoteFileSize(const std::string &url, std::atomic *abort = nullptr); std::string httpGet(const std::string &url, size_t chunk_size = 0, std::atomic *abort = nullptr); + +typedef std::function DownloadProgressHandler; +void installDownloadProgressHandler(DownloadProgressHandler); bool httpDownload(const std::string &url, const std::string &file, size_t chunk_size = 0, std::atomic *abort = nullptr); +std::string formattedDataSize(size_t size);