From ae4f3c98d5cb95647fb24c518f1d4032cec7765f Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Fri, 6 Aug 2021 11:10:33 +0200 Subject: [PATCH] refactor loggerd: trigger rotate in encoder thread (#21860) * trigger rotate in eoncode thread * rotate in time * lgtm * dcam trigger rotate on C3 * check trigger rotate field * Use >= * add rotator thread * set cnt to 0 * test encodeIdx is increasing across segments * test both segmentId and encodeId * fix encodeIdx * no thread needed * add log for failed to encode * stricter C3 test, dont check start encodeId on C2 dcam * only update last_camera_seen_tms when considered for rotate Co-authored-by: deanlee old-commit-hash: a39873872de5b2239e4b89119974e5aa961eba48 --- selfdrive/camerad/cameras/camera_common.h | 1 + selfdrive/loggerd/loggerd.cc | 322 +++++++--------------- selfdrive/loggerd/tests/test_encoder.py | 32 ++- 3 files changed, 121 insertions(+), 234 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.h b/selfdrive/camerad/cameras/camera_common.h index 16b5942d07..2ba348f9c0 100644 --- a/selfdrive/camerad/cameras/camera_common.h +++ b/selfdrive/camerad/cameras/camera_common.h @@ -63,6 +63,7 @@ typedef struct LogCameraInfo { bool is_h265; bool downscale; bool has_qcamera; + bool trigger_rotate; } LogCameraInfo; typedef struct FrameMetadata { diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 9ae49fb3b9..b2ecc4bc65 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include "cereal/messaging/messaging.h" #include "cereal/services.h" @@ -59,7 +60,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { .bitrate = MAIN_BITRATE, .is_h265 = true, .downscale = false, - .has_qcamera = true + .has_qcamera = true, + .trigger_rotate = true }, [LOG_CAMERA_ID_DCAMERA] = { .stream_type = VISION_STREAM_YUV_FRONT, @@ -69,7 +71,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { .bitrate = DCAM_BITRATE, .is_h265 = true, .downscale = false, - .has_qcamera = false + .has_qcamera = false, + .trigger_rotate = Hardware::TICI(), }, [LOG_CAMERA_ID_ECAMERA] = { .stream_type = VISION_STREAM_YUV_WIDE, @@ -79,7 +82,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { .bitrate = MAIN_BITRATE, .is_h265 = true, .downscale = false, - .has_qcamera = false + .has_qcamera = false, + .trigger_rotate = true }, [LOG_CAMERA_ID_QCAMERA] = { .filename = "qcamera.ts", @@ -92,81 +96,27 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { }, }; -class RotateState { -public: - SubSocket* fpkt_sock; - uint32_t stream_frame_id, log_frame_id, last_rotate_frame_id; - bool enabled, should_rotate, initialized; - std::atomic rotating; - std::atomic cur_seg; - - RotateState() : fpkt_sock(nullptr), stream_frame_id(0), log_frame_id(0), - last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false), initialized(false), rotating(false), cur_seg(-1) {}; - - void waitLogThread() { - std::unique_lock lk(fid_lock); - while (stream_frame_id > log_frame_id // if the log camera is older, wait for it to catch up. - && (stream_frame_id - log_frame_id) < 8 // but if its too old then there probably was a discontinuity (visiond restarted) - && !do_exit) { - cv.wait(lk); - } - } - - void cancelWait() { - cv.notify_one(); - } - - void setStreamFrameId(uint32_t frame_id) { - fid_lock.lock(); - stream_frame_id = frame_id; - fid_lock.unlock(); - cv.notify_one(); - } - - void setLogFrameId(uint32_t frame_id) { - fid_lock.lock(); - log_frame_id = frame_id; - fid_lock.unlock(); - cv.notify_one(); - } - - void rotate() { - if (enabled) { - std::unique_lock lk(fid_lock); - should_rotate = true; - last_rotate_frame_id = stream_frame_id; - } - } - - void finish_rotate() { - std::unique_lock lk(fid_lock); - should_rotate = false; - } - -private: - std::mutex fid_lock; - std::condition_variable cv; -}; - struct LoggerdState { Context *ctx; LoggerState logger = {}; char segment_path[4096]; - int rotate_segment; - pthread_mutex_t rotate_lock; - RotateState rotate_state[LOG_CAMERA_ID_MAX-1]; + std::mutex rotate_lock; + std::condition_variable rotate_cv; + std::atomic rotate_segment; + std::atomic last_camera_seen_tms; + std::atomic waiting_rotate; + int max_waiting = 0; + double last_rotate_tms = 0.; }; LoggerdState s; void encoder_thread(int cam_idx) { assert(cam_idx < LOG_CAMERA_ID_MAX-1); - - LogCameraInfo &cam_info = cameras_logged[cam_idx]; - RotateState &rotate_state = s.rotate_state[cam_idx]; - + const LogCameraInfo &cam_info = cameras_logged[cam_idx]; set_thread_name(cam_info.filename); - int cnt = 0; + int cnt = 0, cur_seg = -1; + int encode_idx = 0; LoggerHandle *lh = NULL; std::vector encoders; VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); @@ -198,68 +148,47 @@ void encoder_thread(int cam_idx) { while (!do_exit) { VisionIpcBufExtra extra; VisionBuf* buf = vipc_client.recv(&extra); - if (buf == nullptr) { - continue; - } + if (buf == nullptr) continue; - //printf("logger latency to tsEof: %f\n", (double)(nanos_since_boot() - extra.timestamp_eof) / 1000000.0); - - // all the rotation stuff - { - pthread_mutex_lock(&s.rotate_lock); - pthread_mutex_unlock(&s.rotate_lock); - - // wait if camera pkt id is older than stream - rotate_state.waitLogThread(); - - if (do_exit) break; - - // rotate the encoder if the logger is on a newer segment - if (rotate_state.should_rotate) { - LOGW("camera %d rotate encoder to %s", cam_idx, s.segment_path); - - if (!rotate_state.initialized) { - rotate_state.last_rotate_frame_id = extra.frame_id - 1; - rotate_state.initialized = true; - } - - // get new logger handle for new segment - if (lh) { - lh_close(lh); - } - lh = logger_get_handle(&s.logger); + if (cam_info.trigger_rotate) { + s.last_camera_seen_tms = millis_since_boot(); + } - // wait for all to start rotating - rotate_state.rotating = true; - for(auto &r : s.rotate_state) { - while(r.enabled && !r.rotating && !do_exit) util::sleep_for(5); - } + if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { + // trigger rotate and wait logger rotated to new segment + ++s.waiting_rotate; + std::unique_lock lk(s.rotate_lock); + s.rotate_cv.wait(lk, [&] { return s.rotate_segment > cur_seg || do_exit; }); + } + if (do_exit) break; - pthread_mutex_lock(&s.rotate_lock); - for (auto &e : encoders) { - e->encoder_close(); - e->encoder_open(s.segment_path); - } - rotate_state.cur_seg = s.rotate_segment; - pthread_mutex_unlock(&s.rotate_lock); + // rotate the encoder if the logger is on a newer segment + if (s.rotate_segment > cur_seg) { + cur_seg = s.rotate_segment; + cnt = 0; - // wait for all to finish rotating - for(auto &r : s.rotate_state) { - while(r.enabled && r.cur_seg != s.rotate_segment && !do_exit) util::sleep_for(5); - } - rotate_state.rotating = false; - rotate_state.finish_rotate(); + LOGW("camera %d rotate encoder to %s", cam_idx, s.segment_path); + for (auto &e : encoders) { + e->encoder_close(); + e->encoder_open(s.segment_path); } + if (lh) { + lh_close(lh); + } + lh = logger_get_handle(&s.logger); } - rotate_state.setStreamFrameId(extra.frame_id); - // encode a frame for (int i = 0; i < encoders.size(); ++i) { int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, extra.timestamp_eof); + + if (out_id == -1) { + LOGE("Failed to encode frame. frame_id: %d encode_id: %d", extra.frame_id, encode_idx); + } + + // publish encode index if (i == 0 && out_id != -1) { - // publish encode index MessageBuilder msg; // this is really ugly auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initDriverEncodeIdx() : @@ -272,8 +201,8 @@ void encoder_thread(int cam_idx) { } else { eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); } - eidx.setEncodeId(cnt); - eidx.setSegmentNum(rotate_state.cur_seg); + eidx.setEncodeId(encode_idx); + eidx.setSegmentNum(cur_seg); eidx.setSegmentId(out_id); if (lh) { // TODO: this should read cereal/services.h for qlog decimation @@ -284,6 +213,7 @@ void encoder_thread(int cam_idx) { } cnt++; + encode_idx++; } if (lh) { @@ -311,6 +241,33 @@ void clear_locks() { ftw(LOG_ROOT.c_str(), clear_locks_fn, 16); } +void logger_rotate() { + { + std::unique_lock lk(s.rotate_lock); + int segment = -1; + int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); + assert(err == 0); + s.rotate_segment = segment; + s.waiting_rotate = 0; + s.last_rotate_tms = millis_since_boot(); + } + s.rotate_cv.notify_all(); + LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path); +} + +void rotate_if_needed() { + if (s.waiting_rotate == s.max_waiting) { + logger_rotate(); + } + + double tms = millis_since_boot(); + if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && + (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE) { + LOGW("no camera packet seen. auto rotating"); + logger_rotate(); + } +} + } // namespace int main(int argc, char** argv) { @@ -322,11 +279,10 @@ int main(int argc, char** argv) { typedef struct QlogState { int counter, freq; } QlogState; - std::map qlog_states; + std::unordered_map qlog_states; s.ctx = Context::create(); Poller * poller = Poller::create(); - std::vector socks; // subscribe to all socks for (const auto& it : services) { @@ -335,13 +291,6 @@ int main(int argc, char** argv) { SubSocket * sock = SubSocket::create(s.ctx, it.name); assert(sock != NULL); poller->registerSocket(sock); - socks.push_back(sock); - - for (int cid=0; cid<=MAX_CAM_IDX; cid++) { - if (std::string(it.name) == cameras_logged[cid].frame_packet_name) { - s.rotate_state[cid].fpkt_sock = sock; - } - } qlog_states[sock] = {.counter = 0, .freq = it.decimation}; } @@ -349,124 +298,57 @@ int main(int argc, char** argv) { // init logger logger_init(&s.logger, "rlog", true); + logger_rotate(); params.put("CurrentRoute", s.logger.route_name); // init encoders - pthread_mutex_init(&s.rotate_lock, NULL); - + s.last_camera_seen_tms = millis_since_boot(); // TODO: create these threads dynamically on frame packet presence std::vector encoder_threads; encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_FCAMERA)); - s.rotate_state[LOG_CAMERA_ID_FCAMERA].enabled = true; + if (cameras_logged[LOG_CAMERA_ID_FCAMERA].trigger_rotate) { + s.max_waiting += 1; + } if (!Hardware::PC() && params.getBool("RecordFront")) { encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_DCAMERA)); - s.rotate_state[LOG_CAMERA_ID_DCAMERA].enabled = true; + if (cameras_logged[LOG_CAMERA_ID_DCAMERA].trigger_rotate) { + s.max_waiting += 1; + } } if (Hardware::TICI()) { encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_ECAMERA)); - s.rotate_state[LOG_CAMERA_ID_ECAMERA].enabled = true; + if (cameras_logged[LOG_CAMERA_ID_ECAMERA].trigger_rotate) { + s.max_waiting += 1; + } } - uint64_t msg_count = 0; - uint64_t bytes_count = 0; - AlignedBuffer aligned_buf; - - double start_ts = seconds_since_boot(); - double last_rotate_tms = millis_since_boot(); - double last_camera_seen_tms = millis_since_boot(); + uint64_t msg_count = 0, bytes_count = 0; + double start_ts = millis_since_boot(); while (!do_exit) { - // TODO: fix msgs from the first poll getting dropped // poll for new messages on all sockets for (auto sock : poller->poll(1000)) { - // drain socket - Message * last_msg = nullptr; - while (!do_exit) { - Message * msg = sock->receive(true); - if (!msg) { - break; - } - delete last_msg; - last_msg = msg; - - QlogState& qs = qlog_states[sock]; - logger_log(&s.logger, (uint8_t*)msg->getData(), msg->getSize(), qs.counter == 0 && qs.freq != -1); - if (qs.freq != -1) { - qs.counter = (qs.counter + 1) % qs.freq; - } - + QlogState &qs = qlog_states[sock]; + Message *msg = nullptr; + while (!do_exit && (msg = sock->receive(true))) { + const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0); + logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); bytes_count += msg->getSize(); - if ((++msg_count % 1000) == 0) { - double ts = seconds_since_boot(); - LOGD("%lu messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count * 1.0 / (ts - start_ts), bytes_count * 0.001 / (ts - start_ts)); - } - } + delete msg; - if (last_msg) { - int fpkt_id = -1; - for (int cid = 0; cid <=MAX_CAM_IDX; cid++) { - if (sock == s.rotate_state[cid].fpkt_sock) { - fpkt_id=cid; - break; - } - } - if (fpkt_id >= 0) { - // track camera frames to sync to encoder - // only process last frame - capnp::FlatArrayMessageReader cmsg(aligned_buf.align(last_msg)); - cereal::Event::Reader event = cmsg.getRoot(); - - if (fpkt_id == LOG_CAMERA_ID_FCAMERA) { - s.rotate_state[fpkt_id].setLogFrameId(event.getRoadCameraState().getFrameId()); - } else if (fpkt_id == LOG_CAMERA_ID_DCAMERA) { - s.rotate_state[fpkt_id].setLogFrameId(event.getDriverCameraState().getFrameId()); - } else if (fpkt_id == LOG_CAMERA_ID_ECAMERA) { - s.rotate_state[fpkt_id].setLogFrameId(event.getWideRoadCameraState().getFrameId()); - } - last_camera_seen_tms = millis_since_boot(); - } - } - delete last_msg; - } + rotate_if_needed(); - bool new_segment = s.logger.part == -1; - if (s.logger.part > -1) { - double tms = millis_since_boot(); - if (tms - last_camera_seen_tms <= NO_CAMERA_PATIENCE && encoder_threads.size() > 0) { - new_segment = true; - for (auto &r : s.rotate_state) { - // this *should* be redundant on tici since all camera frames are synced - new_segment &= (((r.stream_frame_id >= r.last_rotate_frame_id + SEGMENT_LENGTH * MAIN_FPS) && - (!r.should_rotate) && (r.initialized)) || - (!r.enabled)); - if (!Hardware::TICI()) break; // only look at fcamera frame id if not QCOM2 - } - } else { - if (tms - last_rotate_tms > SEGMENT_LENGTH * 1000) { - new_segment = true; - LOGW("no camera packet seen. auto rotated"); + if ((++msg_count % 1000) == 0) { + double seconds = (millis_since_boot() - start_ts) / 1000.0; + LOGD("%lu messages, %.2f msg/sec, %.2f KB/sec", msg_count, msg_count / seconds, bytes_count * 0.001 / seconds); } } } - - // rotate to new segment - if (new_segment) { - pthread_mutex_lock(&s.rotate_lock); - last_rotate_tms = millis_since_boot(); - - int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &s.rotate_segment); - assert(err == 0); - LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path); - - // rotate encoders - for (auto &r : s.rotate_state) r.rotate(); - pthread_mutex_unlock(&s.rotate_lock); - } } LOGW("closing encoders"); - for (auto &r : s.rotate_state) r.cancelWait(); + s.rotate_cv.notify_all(); for (auto &t : encoder_threads) t.join(); LOGW("closing logger"); @@ -479,7 +361,7 @@ int main(int argc, char** argv) { } // messaging cleanup - for (auto sock : socks) delete sock; + for (auto &[sock, qs] : qlog_states) delete sock; delete poller; delete s.ctx; diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py index 2b05ce9808..5cd7da5a02 100755 --- a/selfdrive/loggerd/tests/test_encoder.py +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -88,6 +88,8 @@ class TestEncoder(unittest.TestCase): if not record_front and "dcamera" in camera: continue + eon_dcam = EON and (camera == 'dcamera.hevc') + file_path = f"{route_prefix_path}--{i}/{camera}" # check file size @@ -103,30 +105,32 @@ class TestEncoder(unittest.TestCase): cmd = "LD_LIBRARY_PATH=/usr/local/lib " + cmd expected_frames = fps * SEGMENT_LENGTH - frame_tolerance = 1 if (EON and camera == 'dcamera.hevc') else 0 + frame_tolerance = 1 if eon_dcam else 0 probe = subprocess.check_output(cmd, shell=True, encoding='utf8') frame_count = int(probe.split('\n')[0].strip()) counts.append(frame_count) - if EON: - self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, - f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") - else: - # loggerd waits for the slowest camera, so check count is at least the expected count, - # then check the min of the frame counts is exactly the expected frame count - self.assertTrue(frame_count >= expected_frames, - f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") + self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, + f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") # Check encodeIdx if encode_idx_name is not None: rlog_path = f"{route_prefix_path}--{i}/rlog.bz2" - idxs = [getattr(m, encode_idx_name).segmentId for m in LogReader(rlog_path) if m.which() == encode_idx_name] - self.assertEqual(frame_count, len(idxs)) + + segment_idxs = [getattr(m, encode_idx_name).segmentId for m in LogReader(rlog_path) if m.which() == encode_idx_name] + encode_idxs = [getattr(m, encode_idx_name).encodeId for m in LogReader(rlog_path) if m.which() == encode_idx_name] + + # Check frame count + self.assertEqual(frame_count, len(segment_idxs)) + self.assertEqual(frame_count, len(encode_idxs)) # Check for duplicates or skips - self.assertEqual(0, idxs[0]) - self.assertEqual(len(idxs)-1, idxs[-1]) - self.assertEqual(len(set(idxs)), len(idxs)) + self.assertEqual(0, segment_idxs[0]) + self.assertEqual(len(set(segment_idxs)), len(segment_idxs)) + + if not eon_dcam: + self.assertEqual(expected_frames * i, encode_idxs[0]) + self.assertEqual(len(set(encode_idxs)), len(encode_idxs)) if TICI: expected_frames = fps * SEGMENT_LENGTH