diff --git a/release/files_common b/release/files_common index 12bb8b2ed9..3f8c88547e 100644 --- a/release/files_common +++ b/release/files_common @@ -237,16 +237,13 @@ selfdrive/loggerd/encoder/encoder.cc selfdrive/loggerd/encoder/encoder.h selfdrive/loggerd/encoder/v4l_encoder.cc selfdrive/loggerd/encoder/v4l_encoder.h -selfdrive/loggerd/encoder/video_writer.cc -selfdrive/loggerd/encoder/video_writer.h -selfdrive/loggerd/remote_encoder.cc -selfdrive/loggerd/remote_encoder.h +selfdrive/loggerd/video_writer.cc +selfdrive/loggerd/video_writer.h selfdrive/loggerd/logger.cc selfdrive/loggerd/logger.h selfdrive/loggerd/loggerd.cc selfdrive/loggerd/loggerd.h selfdrive/loggerd/encoderd.cc -selfdrive/loggerd/main.cc selfdrive/loggerd/bootlog.cc selfdrive/loggerd/encoder/ffmpeg_encoder.cc selfdrive/loggerd/encoder/ffmpeg_encoder.h diff --git a/selfdrive/hardware/tici/test_power_draw.py b/selfdrive/hardware/tici/test_power_draw.py index 95c7505421..0435193bc8 100755 --- a/selfdrive/hardware/tici/test_power_draw.py +++ b/selfdrive/hardware/tici/test_power_draw.py @@ -22,7 +22,7 @@ PROCS = [ Proc('camerad', 2.17), Proc('modeld', 0.95), Proc('dmonitoringmodeld', 0.25), - Proc('loggerd', 0.45, warmup=10.), + Proc('encoderd', 0.42), ] diff --git a/selfdrive/loggerd/SConscript b/selfdrive/loggerd/SConscript index 81eca19a47..a15aac380d 100644 --- a/selfdrive/loggerd/SConscript +++ b/selfdrive/loggerd/SConscript @@ -5,7 +5,7 @@ libs = [common, cereal, messaging, visionipc, 'avformat', 'avcodec', 'swscale', 'avutil', 'yuv', 'OpenCL', 'pthread'] -src = ['logger.cc', 'loggerd.cc', 'encoder/video_writer.cc', 'remote_encoder.cc', 'encoder/encoder.cc'] +src = ['logger.cc', 'video_writer.cc', 'encoder/encoder.cc'] if arch == "larch64": src += ['encoder/v4l_encoder.cc'] else: @@ -19,9 +19,9 @@ if arch == "Darwin": logger_lib = env.Library('logger', src) libs.insert(0, logger_lib) -env.Program('loggerd', ['main.cc'], LIBS=libs) +env.Program('loggerd', ['loggerd.cc'], LIBS=libs) env.Program('encoderd', ['encoderd.cc'], LIBS=libs) env.Program('bootlog.cc', LIBS=libs) if GetOption('test'): - env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_loggerd.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto']) + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto']) diff --git a/selfdrive/loggerd/encoder/encoder.cc b/selfdrive/loggerd/encoder/encoder.cc index 84f26d9787..86fd441979 100644 --- a/selfdrive/loggerd/encoder/encoder.cc +++ b/selfdrive/loggerd/encoder/encoder.cc @@ -24,7 +24,7 @@ void VideoEncoder::publisher_publish(VideoEncoder *e, int segment_num, uint32_t edata.setTimestampSof(extra.timestamp_sof); edata.setTimestampEof(extra.timestamp_eof); edata.setType(e->codec); - edata.setEncodeId(idx); + edata.setEncodeId(e->cnt++); edata.setSegmentNum(segment_num); edata.setSegmentId(idx); edata.setFlags(flags); diff --git a/selfdrive/loggerd/encoder/encoder.h b/selfdrive/loggerd/encoder/encoder.h index abdcc9c4bf..94c9c551d9 100644 --- a/selfdrive/loggerd/encoder/encoder.h +++ b/selfdrive/loggerd/encoder/encoder.h @@ -7,7 +7,7 @@ #include "cereal/messaging/messaging.h" #include "cereal/visionipc/visionipc.h" #include "selfdrive/common/queue.h" -#include "selfdrive/loggerd/encoder/video_writer.h" +#include "selfdrive/loggerd/video_writer.h" #include "selfdrive/camerad/cameras/camera_common.h" #define V4L2_BUF_FLAG_KEYFRAME 8 @@ -49,6 +49,9 @@ protected: CameraType type; private: + // total frames encoded + int cnt = 0; + // publishing std::unique_ptr pm; const char *service_name; diff --git a/selfdrive/loggerd/encoder/ffmpeg_encoder.h b/selfdrive/loggerd/encoder/ffmpeg_encoder.h index 5f90e198fc..ae41abc69f 100644 --- a/selfdrive/loggerd/encoder/ffmpeg_encoder.h +++ b/selfdrive/loggerd/encoder/ffmpeg_encoder.h @@ -13,7 +13,6 @@ extern "C" { #include "selfdrive/loggerd/encoder/encoder.h" #include "selfdrive/loggerd/loggerd.h" -#include "selfdrive/loggerd/encoder/video_writer.h" class FfmpegEncoder : public VideoEncoder { public: diff --git a/selfdrive/loggerd/encoder/v4l_encoder.cc b/selfdrive/loggerd/encoder/v4l_encoder.cc index f5608a067e..26d79cc2a9 100644 --- a/selfdrive/loggerd/encoder/v4l_encoder.cc +++ b/selfdrive/loggerd/encoder/v4l_encoder.cc @@ -115,8 +115,8 @@ void V4LEncoder::dequeue_handler(V4LEncoder *e) { } if (env_debug_encoder) { - printf("%20s got(%d) %6d bytes flags %8x idx %4d id %8d ts %ld lat %.2f ms (%lu frames free)\n", - e->filename, index, bytesused, flags, idx, frame_id, ts, millis_since_boot()-(ts/1000.), e->free_buf_in.size()); + printf("%20s got(%d) %6d bytes flags %8x idx %3d/%4d id %8d ts %ld lat %.2f ms (%lu frames free)\n", + e->filename, index, bytesused, flags, e->segment_num, idx, frame_id, ts, millis_since_boot()-(ts/1000.), e->free_buf_in.size()); } // requeue the buffer diff --git a/selfdrive/loggerd/encoder/v4l_encoder.h b/selfdrive/loggerd/encoder/v4l_encoder.h index 13ebab20da..2f62e334fc 100644 --- a/selfdrive/loggerd/encoder/v4l_encoder.h +++ b/selfdrive/loggerd/encoder/v4l_encoder.h @@ -2,7 +2,6 @@ #include "selfdrive/common/queue.h" #include "selfdrive/loggerd/encoder/encoder.h" -#include "selfdrive/loggerd/encoder/video_writer.h" #define BUF_IN_COUNT 7 #define BUF_OUT_COUNT 6 diff --git a/selfdrive/loggerd/encoderd.cc b/selfdrive/loggerd/encoderd.cc index 18e88ca8bb..297eaf88d0 100644 --- a/selfdrive/loggerd/encoderd.cc +++ b/selfdrive/loggerd/encoderd.cc @@ -50,7 +50,7 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) { // init encoders if (encoders.empty()) { VisionBuf buf_info = vipc_client.buffers[0]; - LOGD("encoder init %dx%d", buf_info.width, buf_info.height); + LOGW("encoder %s init %dx%d", cam_info.filename, buf_info.width, buf_info.height); // main encoder encoders.push_back(new Encoder(cam_info.filename, cam_info.type, buf_info.width, buf_info.height, @@ -86,11 +86,19 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) { } lagging = false; - if (cam_info.trigger_rotate) { - if (!sync_encoders(s, cam_info.type, extra.frame_id)) { - continue; + if (!sync_encoders(s, cam_info.type, extra.frame_id)) { + continue; + } + if (do_exit) break; + + // do rotation if required + const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; + if (cur_seg >= 0 && extra.frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { + for (auto &e : encoders) { + e->encoder_close(); + e->encoder_open(NULL); } - if (do_exit) break; + ++cur_seg; } // encode a frame @@ -102,15 +110,6 @@ void encoder_thread(EncoderdState *s, const LogCameraInfo &cam_info) { LOGE("Failed to encode frame. frame_id: %d", extra.frame_id); } } - - const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; - if (cur_seg >= 0 && extra.frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { - for (auto &e : encoders) { - e->encoder_close(); - e->encoder_open(NULL); - } - ++cur_seg; - } } } @@ -128,7 +127,7 @@ void encoderd_thread() { for (const auto &cam : cameras_logged) { if (cam.enable) { encoder_threads.push_back(std::thread(encoder_thread, &s, cam)); - if (cam.trigger_rotate) s.max_waiting++; + s.max_waiting++; } } for (auto &t : encoder_threads) t.join(); @@ -139,7 +138,7 @@ int main() { int ret; ret = util::set_realtime_priority(52); assert(ret == 0); - ret = util::set_core_affinity({7}); + ret = util::set_core_affinity({3}); assert(ret == 0); } encoderd_thread(); diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 9462abba8f..06fd9c74e5 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -1,163 +1,18 @@ #include "selfdrive/loggerd/loggerd.h" -#include "selfdrive/loggerd/remote_encoder.h" -bool env_remote_encoder = getenv("REMOTE_ENCODER") != NULL; +#include "selfdrive/loggerd/video_writer.h" ExitHandler do_exit; -// Handle initial encoder syncing by waiting for all encoders to reach the same frame id -bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) { - if (s->camera_synced[cam_type]) return true; - - if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { - // add a small margin to the start frame id in case one of the encoders already dropped the next frame - update_max_atomic(s->start_frame_id, frame_id + 2); - if (std::exchange(s->camera_ready[cam_type], true) == false) { - ++s->encoders_ready; - LOGD("camera %d encoder ready", cam_type); - } - return false; - } else { - if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id); - bool synced = frame_id >= s->start_frame_id; - s->camera_synced[cam_type] = synced; - if (!synced) LOGD("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id); - return synced; - } -} - -bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id) { - const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; - if (cur_seg >= 0 && frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { - // trigger rotate and wait until the main logger has rotated to the new segment - ++s->ready_to_rotate; - std::unique_lock lk(s->rotate_lock); - s->rotate_cv.wait(lk, [&] { - return s->rotate_segment > cur_seg || do_exit; - }); - return !do_exit; - } - return false; -} - -void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) { - util::set_thread_name(cam_info.filename); - - int cur_seg = -1; - int encode_idx = 0; - LoggerHandle *lh = NULL; - std::vector encoders; - VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); - - // While we write them right to the log for sync, we also publish the encode idx to the socket - const char *service_name = cam_info.type == DriverCam ? "driverEncodeIdx" : (cam_info.type == WideRoadCam ? "wideRoadEncodeIdx" : "roadEncodeIdx"); - PubMaster pm({service_name}); - - while (!do_exit) { - if (!vipc_client.connect(false)) { - util::sleep_for(5); - continue; - } - - // init encoders - if (encoders.empty()) { - VisionBuf buf_info = vipc_client.buffers[0]; - LOGD("encoder init %dx%d", buf_info.width, buf_info.height); - - // main encoder - encoders.push_back(new Encoder(cam_info.filename, cam_info.type, buf_info.width, buf_info.height, - cam_info.fps, cam_info.bitrate, - cam_info.is_h265 ? cereal::EncodeIndex::Type::FULL_H_E_V_C : cereal::EncodeIndex::Type::QCAMERA_H264, - buf_info.width, buf_info.height, cam_info.record)); - // qcamera encoder - if (cam_info.has_qcamera) { - encoders.push_back(new Encoder(qcam_info.filename, cam_info.type, buf_info.width, buf_info.height, - qcam_info.fps, qcam_info.bitrate, - qcam_info.is_h265 ? cereal::EncodeIndex::Type::FULL_H_E_V_C : cereal::EncodeIndex::Type::QCAMERA_H264, - qcam_info.frame_width, qcam_info.frame_height, true)); - } - } - - while (!do_exit) { - VisionIpcBufExtra extra; - VisionBuf* buf = vipc_client.recv(&extra); - if (buf == nullptr) continue; - - if (cam_info.trigger_rotate) { - s->last_camera_seen_tms = millis_since_boot(); - if (!sync_encoders(s, cam_info.type, extra.frame_id)) { - continue; - } - - // check if we're ready to rotate - trigger_rotate_if_needed(s, cur_seg, extra.frame_id); - if (do_exit) break; - } - - // rotate the encoder if the logger is on a newer segment - if (s->rotate_segment > cur_seg) { - cur_seg = s->rotate_segment; - - LOGW("camera %d rotate encoder to %s", cam_info.type, s->segment_path); - for (auto &e : encoders) { - e->encoder_close(); - e->encoder_open(s->segment_path); - } - if (lh) { - lh_close(lh); - } - lh = logger_get_handle(&s->logger); - } - - // encode a frame - for (int i = 0; i < encoders.size(); ++i) { - int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, - buf->width, buf->height, &extra); - - if (out_id == -1) { - LOGE("Failed to encode frame. frame_id: %d encode_id: %d", extra.frame_id, encode_idx); - } - - // publish encode index - if (i == 0 && out_id != -1) { - MessageBuilder msg; - // this is really ugly - bool valid = (buf->get_frame_id() == extra.frame_id); - auto eidx = cam_info.type == DriverCam ? msg.initEvent(valid).initDriverEncodeIdx() : - (cam_info.type == WideRoadCam ? msg.initEvent(valid).initWideRoadEncodeIdx() : msg.initEvent(valid).initRoadEncodeIdx()); - eidx.setFrameId(extra.frame_id); - eidx.setTimestampSof(extra.timestamp_sof); - eidx.setTimestampEof(extra.timestamp_eof); - if (Hardware::TICI()) { - eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); - } else { - eidx.setType(cam_info.type == DriverCam ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); - } - eidx.setEncodeId(encode_idx); - eidx.setSegmentNum(cur_seg); - eidx.setSegmentId(out_id); - if (lh) { - auto bytes = msg.toBytes(); - lh_log(lh, bytes.begin(), bytes.size(), true); - } - pm.send(service_name, msg); - } - } - - encode_idx++; - } - - if (lh) { - lh_close(lh); - lh = NULL; - } - } - - LOG("encoder destroy"); - for(auto &e : encoders) { - e->encoder_close(); - delete e; - } -} +struct LoggerdState { + LoggerState logger = {}; + char segment_path[4096]; + std::mutex rotate_lock; + std::atomic rotate_segment; + std::atomic last_camera_seen_tms; + std::atomic ready_to_rotate; // count of encoders ready to rotate + int max_waiting = 0; + double last_rotate_tms = 0.; // last rotate time in ms +}; void logger_rotate(LoggerdState *s) { { @@ -169,7 +24,6 @@ void logger_rotate(LoggerdState *s) { s->ready_to_rotate = 0; s->last_rotate_tms = millis_since_boot(); } - s->rotate_cv.notify_all(); LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path); } @@ -187,6 +41,99 @@ void rotate_if_needed(LoggerdState *s) { } } +struct RemoteEncoder { + std::unique_ptr writer; + int segment = -1; + std::vector q; + int logger_segment = -1; + int dropped_frames = 0; + bool recording = false; +}; + +int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re) { + const LogCameraInfo &cam_info = (name == "driverEncodeData") ? cameras_logged[1] : + ((name == "wideRoadEncodeData") ? cameras_logged[2] : + ((name == "qRoadEncodeData") ? qcam_info : cameras_logged[0])); + + // rotation happened, process the queue (happens before the current message) + int bytes_count = 0; + if (re.logger_segment != s->rotate_segment) { + re.logger_segment = s->rotate_segment; + for (auto &qmsg: re.q) { + bytes_count += handle_encoder_msg(s, qmsg, name, re); + } + re.q.clear(); + } + + // extract the message + capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize())); + auto event = cmsg.getRoot(); + auto edata = (name == "driverEncodeData") ? event.getDriverEncodeData() : + ((name == "wideRoadEncodeData") ? event.getWideRoadEncodeData() : + ((name == "qRoadEncodeData") ? event.getQRoadEncodeData() : event.getRoadEncodeData())); + auto idx = edata.getIdx(); + auto flags = idx.getFlags(); + + if (!re.recording) { + // only create on iframe + if (flags & V4L2_BUF_FLAG_KEYFRAME) { + if (re.dropped_frames) { + // this should only happen for the first segment, maybe + LOGD("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); + re.dropped_frames = 0; + } + // if we aren't recording, don't create the writer + if (cam_info.record) { + re.writer.reset(new VideoWriter(s->segment_path, + cam_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, + cam_info.frame_width, cam_info.frame_height, cam_info.fps, idx.getType())); + // write the header + auto header = edata.getHeader(); + re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); + } + re.segment = idx.getSegmentNum(); + re.recording = true; + } else { + ++re.dropped_frames; + return bytes_count; + } + } + + if (re.segment != idx.getSegmentNum()) { + if (re.recording) { + // encoder is on the next segment, this segment is over so we close the videowriter + re.writer.reset(); + re.recording = false; + ++s->ready_to_rotate; + LOGD("rotate %d -> %d ready %d/%d for %s", re.segment, idx.getSegmentNum(), s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + } + // queue up all the new segment messages, they go in after the rotate + re.q.push_back(msg); + } else { + if (re.writer) { + auto data = edata.getData(); + re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); + } + + // put it in log stream as the idx packet + MessageBuilder bmsg; + auto evt = bmsg.initEvent(event.getValid()); + evt.setLogMonoTime(event.getLogMonoTime()); + if (name == "driverEncodeData") { evt.setDriverEncodeIdx(idx); } + if (name == "wideRoadEncodeData") { evt.setWideRoadEncodeIdx(idx); } + if (name == "qRoadEncodeData") { evt.setQRoadEncodeIdx(idx); } + if (name == "roadEncodeData") { evt.setRoadEncodeIdx(idx); } + auto new_msg = bmsg.toBytes(); + logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? + bytes_count += new_msg.size(); + + // this frees the message + delete msg; + } + + return bytes_count; +} + void loggerd_thread() { // setup messaging typedef struct QlogState { @@ -202,7 +149,7 @@ void loggerd_thread() { // subscribe to all socks for (const auto& it : services) { - const bool encoder = env_remote_encoder && (strcmp(it.name+strlen(it.name)-strlen("EncodeData"), "EncodeData") == 0); + const bool encoder = strcmp(it.name+strlen(it.name)-strlen("EncodeData"), "EncodeData") == 0; if (!it.should_log && !encoder) continue; LOGD("logging %s (on port %d)", it.name, it.port); @@ -225,15 +172,10 @@ void loggerd_thread() { // init encoders s.last_camera_seen_tms = millis_since_boot(); - std::vector encoder_threads; for (const auto &cam : cameras_logged) { if (cam.enable) { - if (env_remote_encoder) { - if (cam.has_qcamera) { s.max_waiting++; } - } else { - encoder_threads.push_back(std::thread(encoder_thread, &s, cam)); - } - if (cam.trigger_rotate) s.max_waiting++; + s.max_waiting++; + if (cam.has_qcamera) { s.max_waiting++; } } } @@ -252,6 +194,7 @@ void loggerd_thread() { const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0); if (qs.encoder) { + s.last_camera_seen_tms = millis_since_boot(); bytes_count += handle_encoder_msg(&s, msg, qs.name, remote_encoders[sock]); } else { logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); @@ -275,10 +218,6 @@ void loggerd_thread() { } } - LOGW("closing encoders"); - s.rotate_cv.notify_all(); - for (auto &t : encoder_threads) t.join(); - LOGW("closing logger"); logger_close(&s.logger, &do_exit); @@ -291,3 +230,18 @@ void loggerd_thread() { // messaging cleanup for (auto &[sock, qs] : qlog_states) delete sock; } + +int main(int argc, char** argv) { + if (Hardware::TICI()) { + int ret; + ret = util::set_core_affinity({0, 1, 2, 3}); + assert(ret == 0); + // TODO: why does this impact camerad timings? + //ret = util::set_realtime_priority(1); + //assert(ret == 0); + } + + loggerd_thread(); + + return 0; +} \ No newline at end of file diff --git a/selfdrive/loggerd/loggerd.h b/selfdrive/loggerd/loggerd.h index 447c4fb4c9..98e3deee00 100644 --- a/selfdrive/loggerd/loggerd.h +++ b/selfdrive/loggerd/loggerd.h @@ -50,7 +50,6 @@ struct LogCameraInfo { int bitrate; bool is_h265; bool has_qcamera; - bool trigger_rotate; bool enable; bool record; }; @@ -64,7 +63,6 @@ const LogCameraInfo cameras_logged[] = { .bitrate = MAIN_BITRATE, .is_h265 = true, .has_qcamera = true, - .trigger_rotate = true, .enable = true, .record = true, .frame_width = 1928, @@ -78,7 +76,6 @@ const LogCameraInfo cameras_logged[] = { .bitrate = DCAM_BITRATE, .is_h265 = true, .has_qcamera = false, - .trigger_rotate = true, .enable = true, .record = Params().getBool("RecordFront"), .frame_width = 1928, @@ -92,7 +89,6 @@ const LogCameraInfo cameras_logged[] = { .bitrate = MAIN_BITRATE, .is_h265 = true, .has_qcamera = false, - .trigger_rotate = true, .enable = Hardware::TICI(), .record = Hardware::TICI(), .frame_width = 1928, @@ -109,26 +105,3 @@ const LogCameraInfo qcam_info = { .frame_width = Hardware::TICI() ? 526 : 480, .frame_height = Hardware::TICI() ? 330 : 360 // keep pixel count the same? }; - -struct LoggerdState { - LoggerState logger = {}; - char segment_path[4096]; - std::mutex rotate_lock; - std::condition_variable rotate_cv; - std::atomic rotate_segment; - std::atomic last_camera_seen_tms; - std::atomic ready_to_rotate; // count of encoders ready to rotate - int max_waiting = 0; - double last_rotate_tms = 0.; // last rotate time in ms - - // Sync logic for startup - std::atomic encoders_ready = 0; - std::atomic start_frame_id = 0; - bool camera_ready[WideRoadCam + 1] = {}; - bool camera_synced[WideRoadCam + 1] = {}; -}; - -bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id); -bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id); -void rotate_if_needed(LoggerdState *s); -void loggerd_thread(); diff --git a/selfdrive/loggerd/main.cc b/selfdrive/loggerd/main.cc deleted file mode 100644 index c9bb35bfdb..0000000000 --- a/selfdrive/loggerd/main.cc +++ /dev/null @@ -1,18 +0,0 @@ -#include "selfdrive/loggerd/loggerd.h" - -#include - -int main(int argc, char** argv) { - if (Hardware::TICI()) { - int ret; - ret = util::set_core_affinity({0, 1, 2, 3}); - assert(ret == 0); - // TODO: why does this impact camerad timings? - //ret = util::set_realtime_priority(1); - //assert(ret == 0); - } - - loggerd_thread(); - - return 0; -} diff --git a/selfdrive/loggerd/remote_encoder.cc b/selfdrive/loggerd/remote_encoder.cc deleted file mode 100644 index c640ab5d6c..0000000000 --- a/selfdrive/loggerd/remote_encoder.cc +++ /dev/null @@ -1,82 +0,0 @@ -#include "selfdrive/loggerd/loggerd.h" -#include "selfdrive/loggerd/remote_encoder.h" - -int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re) { - const LogCameraInfo &cam_info = (name == "driverEncodeData") ? cameras_logged[1] : - ((name == "wideRoadEncodeData") ? cameras_logged[2] : - ((name == "qRoadEncodeData") ? qcam_info : cameras_logged[0])); - if (!cam_info.record) return 0; // TODO: handle this by not subscribing - - // rotation happened, process the queue (happens before the current message) - int bytes_count = 0; - if (re.logger_segment != s->rotate_segment) { - re.logger_segment = s->rotate_segment; - for (auto &qmsg: re.q) { - bytes_count += handle_encoder_msg(s, qmsg, name, re); - } - re.q.clear(); - } - - // extract the message - capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize())); - auto event = cmsg.getRoot(); - auto edata = (name == "driverEncodeData") ? event.getDriverEncodeData() : - ((name == "wideRoadEncodeData") ? event.getWideRoadEncodeData() : - ((name == "qRoadEncodeData") ? event.getQRoadEncodeData() : event.getRoadEncodeData())); - auto idx = edata.getIdx(); - auto flags = idx.getFlags(); - - if (!re.writer) { - // only create on iframe - if (flags & V4L2_BUF_FLAG_KEYFRAME) { - if (re.dropped_frames) { - // this should only happen for the first segment, maybe - LOGD("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); - re.dropped_frames = 0; - } - re.writer.reset(new VideoWriter(s->segment_path, - cam_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, - cam_info.frame_width, cam_info.frame_height, cam_info.fps, idx.getType())); - // write the header - auto header = edata.getHeader(); - re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); - re.segment = idx.getSegmentNum(); - } else { - ++re.dropped_frames; - return bytes_count; - } - } - - if (re.segment != idx.getSegmentNum()) { - if (re.writer) { - // encoder is on the next segment, this segment is over so we close the videowriter - re.writer.reset(); - ++s->ready_to_rotate; - LOGD("rotate %d -> %d ready %d/%d", re.segment, idx.getSegmentNum(), s->ready_to_rotate.load(), s->max_waiting); - } - // queue up all the new segment messages, they go in after the rotate - re.q.push_back(msg); - } else { - auto data = edata.getData(); - re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); - - // put it in log stream as the idx packet - MessageBuilder bmsg; - auto evt = bmsg.initEvent(event.getValid()); - evt.setLogMonoTime(event.getLogMonoTime()); - if (name == "driverEncodeData") { evt.setDriverEncodeIdx(idx); } - if (name == "wideRoadEncodeData") { evt.setWideRoadEncodeIdx(idx); } - if (name == "qRoadEncodeData") { evt.setQRoadEncodeIdx(idx); } - if (name == "roadEncodeData") { evt.setRoadEncodeIdx(idx); } - auto new_msg = bmsg.toBytes(); - logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? - bytes_count += new_msg.size(); - - // this frees the message - delete msg; - } - - return bytes_count; -} - - diff --git a/selfdrive/loggerd/remote_encoder.h b/selfdrive/loggerd/remote_encoder.h deleted file mode 100644 index 79e628f514..0000000000 --- a/selfdrive/loggerd/remote_encoder.h +++ /dev/null @@ -1,11 +0,0 @@ -#include "selfdrive/loggerd/encoder/video_writer.h" - -struct RemoteEncoder { - std::unique_ptr writer; - int segment = -1; - std::vector q; - int logger_segment = -1; - int dropped_frames = 0; -}; - -int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re); \ No newline at end of file diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py index 88b06b4b4c..2e69f3b29a 100755 --- a/selfdrive/loggerd/tests/test_encoder.py +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -62,6 +62,7 @@ class TestEncoder(unittest.TestCase): managed_processes['sensord'].start() managed_processes['loggerd'].start() + managed_processes['encoderd'].start() time.sleep(1.0) managed_processes['camerad'].start() @@ -150,6 +151,7 @@ class TestEncoder(unittest.TestCase): time.sleep(0.1) finally: managed_processes['loggerd'].stop() + managed_processes['encoderd'].stop() managed_processes['camerad'].stop() managed_processes['sensord'].stop() diff --git a/selfdrive/loggerd/tests/test_loggerd.cc b/selfdrive/loggerd/tests/test_loggerd.cc deleted file mode 100644 index d84185cbba..0000000000 --- a/selfdrive/loggerd/tests/test_loggerd.cc +++ /dev/null @@ -1,93 +0,0 @@ -#include - -#include "catch2/catch.hpp" -#include "selfdrive/loggerd/loggerd.h" - -int random_int(int min, int max) { - std::random_device dev; - std::mt19937 rng(dev()); - std::uniform_int_distribution dist(min, max); - return dist(rng); -} - -int get_synced_frame_id(LoggerdState *s, CameraType cam_type, int start_frame_id) { - int frame_id = start_frame_id; - while (!sync_encoders(s, cam_type, frame_id)) { - ++frame_id; - usleep(0); - } - return frame_id; -}; - -TEST_CASE("sync_encoders") { - const int max_waiting = GENERATE(1, 2, 3); - - for (int test_cnt = 0; test_cnt < 10; ++test_cnt) { - LoggerdState s{.max_waiting = max_waiting}; - std::vector start_frames; - std::vector> futures; - - for (int i = 0; i < max_waiting; ++i) { - int start_frame_id = random_int(0, 20); - start_frames.push_back(start_frame_id); - futures.emplace_back(std::async(std::launch::async, get_synced_frame_id, &s, (CameraType)i, start_frame_id)); - } - - // get results - int synced_frame_id = 0; - for (int i = 0; i < max_waiting; ++i) { - if (i == 0) { - synced_frame_id = futures[i].get(); - // require synced_frame_id equal start_frame_id if max_waiting == 1 - if (max_waiting == 1) { - REQUIRE(synced_frame_id == start_frames[0]); - } - } else { - REQUIRE(futures[i].get() == synced_frame_id); - } - } - } -} - -const int MAX_SEGMENT_CNT = 100; - -std::pair encoder_thread(LoggerdState *s) { - int cur_seg = 0; - uint32_t frame_id = s->start_frame_id; - - while (cur_seg < MAX_SEGMENT_CNT) { - ++frame_id; - if (trigger_rotate_if_needed(s, cur_seg, frame_id)) { - cur_seg = s->rotate_segment; - } - util::sleep_for(0); - } - - return {cur_seg, frame_id}; -} - -TEST_CASE("trigger_rotate") { - const int encoders = GENERATE(1, 2, 3); - const int start_frame_id = random_int(0, 20); - - LoggerdState s{ - .max_waiting = encoders, - .start_frame_id = start_frame_id, - }; - - std::vector>> futures; - for (int i = 0; i < encoders; ++i) { - futures.emplace_back(std::async(std::launch::async, encoder_thread, &s)); - } - - while (s.rotate_segment < MAX_SEGMENT_CNT) { - rotate_if_needed(&s); - util::sleep_for(10); - } - - for (auto &f : futures) { - auto [encoder_seg, frame_id] = f.get(); - REQUIRE(encoder_seg == MAX_SEGMENT_CNT); - REQUIRE(frame_id == start_frame_id + encoder_seg * (SEGMENT_LENGTH * MAIN_FPS)); - } -} diff --git a/selfdrive/loggerd/tests/test_loggerd.py b/selfdrive/loggerd/tests/test_loggerd.py index f9f938ae7a..1d3fcfce09 100755 --- a/selfdrive/loggerd/tests/test_loggerd.py +++ b/selfdrive/loggerd/tests/test_loggerd.py @@ -126,6 +126,7 @@ class TestLoggerd(unittest.TestCase): length = random.randint(1, 3) os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length) managed_processes["loggerd"].start() + managed_processes["encoderd"].start() fps = 20.0 for n in range(1, int(num_segs*length*fps)+1): @@ -140,6 +141,7 @@ class TestLoggerd(unittest.TestCase): time.sleep(1.0/fps) managed_processes["loggerd"].stop() + managed_processes["encoderd"].stop() route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0] for n in range(num_segs): diff --git a/selfdrive/loggerd/encoder/video_writer.cc b/selfdrive/loggerd/video_writer.cc similarity index 96% rename from selfdrive/loggerd/encoder/video_writer.cc rename to selfdrive/loggerd/video_writer.cc index 4eb66606c6..24ea98eb30 100644 --- a/selfdrive/loggerd/encoder/video_writer.cc +++ b/selfdrive/loggerd/video_writer.cc @@ -1,7 +1,7 @@ #include #include -#include "selfdrive/loggerd/encoder/video_writer.h" +#include "selfdrive/loggerd/video_writer.h" #include "selfdrive/common/swaglog.h" #include "selfdrive/common/util.h" @@ -88,7 +88,7 @@ void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecc // TODO: can use av_write_frame for non raw? int err = av_interleaved_write_frame(ofmt_ctx, &pkt); - if (err < 0) { LOGW("ts encoder write issue"); } + if (err < 0) { LOGW("ts encoder write issue len: %d ts: %lu", len, timestamp); } av_packet_unref(&pkt); } diff --git a/selfdrive/loggerd/encoder/video_writer.h b/selfdrive/loggerd/video_writer.h similarity index 100% rename from selfdrive/loggerd/encoder/video_writer.h rename to selfdrive/loggerd/video_writer.h diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 901a928796..ad3cabbbd9 100644 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -24,6 +24,7 @@ procs = [ NativeProcess("clocksd", "selfdrive/clocksd", ["./clocksd"]), NativeProcess("dmonitoringmodeld", "selfdrive/modeld", ["./dmonitoringmodeld"], enabled=(not PC or WEBCAM), callback=driverview), NativeProcess("logcatd", "selfdrive/logcatd", ["./logcatd"]), + NativeProcess("encoderd", "selfdrive/loggerd", ["./encoderd"]), NativeProcess("loggerd", "selfdrive/loggerd", ["./loggerd"], onroad=False, callback=logging), NativeProcess("modeld", "selfdrive/modeld", ["./modeld"]), NativeProcess("navd", "selfdrive/ui/navd", ["./navd"], offroad=True), diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 8644baac8d..f098d353d2 100755 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -22,7 +22,8 @@ from tools.lib.logreader import LogReader # Baseline CPU usage by process PROCS = { "selfdrive.controls.controlsd": 31.0, - "./loggerd": 50.0, + "./loggerd": 10.0, + "./encoderd": 37.3, "./camerad": 16.5, "./locationd": 9.1, "selfdrive.controls.plannerd": 11.7,