diff --git a/selfdrive/loggerd/encoder.c b/selfdrive/loggerd/encoder.c index 1ebb1dd331..91b9dc8df5 100644 --- a/selfdrive/loggerd/encoder.c +++ b/selfdrive/loggerd/encoder.c @@ -650,8 +650,8 @@ void encoder_close(EncoderState *s) { fclose(s->of); } unlink(s->lock_path); + s->open = false; } - s->open = false; pthread_mutex_unlock(&s->lock); } diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index a247aaffaf..fdf958018e 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -41,8 +40,7 @@ #include "services.h" #if !(defined(QCOM) || defined(QCOM2)) -// no encoder on PC -#define DISABLE_ENCODER +#define DISABLE_ENCODER // TODO: loggerd for PC #endif #ifndef DISABLE_ENCODER @@ -106,9 +104,10 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = { #endif }, }; -#define SEGMENT_LENGTH 60 -#define LOG_ROOT "/data/media/0/realdata" + +constexpr int SEGMENT_LENGTH = 60; +const char* LOG_ROOT = "/data/media/0/realdata"; namespace { @@ -127,7 +126,7 @@ static void set_do_exit(int sig) { do_exit = 1; } -static bool file_exists (const std::string& fn) { +static bool file_exists(const std::string& fn) { std::ifstream f(fn); return f.good(); } @@ -199,34 +198,17 @@ struct LoggerdState { LoggerdState s; #ifndef DISABLE_ENCODER -void encoder_thread(RotateState *rotate_state, int cam_idx) { +void encoder_thread(int cam_idx) { + assert(cam_idx < LOG_CAMERA_ID_MAX-1); - switch (cam_idx) { - case LOG_CAMERA_ID_DCAMERA: { - LOGW("recording front camera"); - set_thread_name("FrontCameraEncoder"); - break; - } - case LOG_CAMERA_ID_FCAMERA: { - set_thread_name("RearCameraEncoder"); - break; - } - case LOG_CAMERA_ID_ECAMERA: { - set_thread_name("WideCameraEncoder"); - break; - } - default: { - LOGE("unexpected camera index provided"); - assert(false); - } - } + LogCameraInfo &cam_info = cameras_logged[cam_idx]; + set_thread_name(cam_info.filename); VisionStream stream; + RotateState &rotate_state = s.rotate_state[cam_idx]; + rotate_state.enabled = true; - bool encoder_inited = false; - EncoderState encoder; - EncoderState encoder_alt; - bool has_encoder_alt = cameras_logged[cam_idx].has_qcamera; + std::vector encoders; int encoder_segment = -1; int cnt = 0; @@ -239,34 +221,28 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { while (!do_exit) { VisionStreamBufs buf_info; - int err = visionstream_init(&stream, cameras_logged[cam_idx].stream_type, false, &buf_info); + int err = visionstream_init(&stream, cam_info.stream_type, false, &buf_info); if (err != 0) { LOGD("visionstream connect fail"); util::sleep_for(100); continue; } - if (!encoder_inited) { + if (encoders.empty()) { LOGD("encoder init %dx%d", buf_info.width, buf_info.height); - encoder_init(&encoder, cameras_logged[cam_idx].filename, - buf_info.width, - buf_info.height, - cameras_logged[cam_idx].fps, - cameras_logged[cam_idx].bitrate, - cameras_logged[cam_idx].is_h265, - cameras_logged[cam_idx].downscale); - - if (has_encoder_alt) { - encoder_init(&encoder_alt, cameras_logged[LOG_CAMERA_ID_QCAMERA].filename, - cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_width, - cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_height, - cameras_logged[LOG_CAMERA_ID_QCAMERA].fps, - cameras_logged[LOG_CAMERA_ID_QCAMERA].bitrate, - cameras_logged[LOG_CAMERA_ID_QCAMERA].is_h265, - cameras_logged[LOG_CAMERA_ID_QCAMERA].downscale); - } - encoder_inited = true; + // main encoder + encoders.push_back(new EncoderState()); + encoder_init(encoders[0], cam_info.filename, buf_info.width, buf_info.height, + cam_info.fps, cam_info.bitrate, cam_info.is_h265, cam_info.downscale); + + // qcamera encoder + if (cam_info.has_qcamera) { + LogCameraInfo &qcam_info = cameras_logged[LOG_CAMERA_ID_QCAMERA]; + encoders.push_back(new EncoderState()); + encoder_init(encoders[1], qcam_info.filename, qcam_info.frame_width, qcam_info.frame_height, + qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale); + } } while (!do_exit) { @@ -288,23 +264,25 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { pthread_mutex_unlock(&s.rotate_lock); // wait if camera pkt id is older than stream - rotate_state->waitLogThread(); + rotate_state.waitLogThread(); if (do_exit) break; // rotate the encoder if the logger is on a newer segment - if (rotate_state->should_rotate) { - if (!rotate_state->initialized) { - rotate_state->last_rotate_frame_id = extra.frame_id - 1; - rotate_state->initialized = true; + if (rotate_state.should_rotate) { + if (!rotate_state.initialized) { + rotate_state.last_rotate_frame_id = extra.frame_id - 1; + rotate_state.initialized = true; } + while (s.rotate_seq_id != my_idx && !do_exit) { usleep(1000); } + LOGW("camera %d rotate encoder to %s.", cam_idx, s.segment_path); - encoder_rotate(&encoder, s.segment_path, s.rotate_segment); - s.rotate_seq_id = (my_idx + 1) % s.num_encoder; - if (has_encoder_alt) { - encoder_rotate(&encoder_alt, s.segment_path, s.rotate_segment); + for (auto &e : encoders) { + encoder_rotate(e, s.segment_path, s.rotate_segment); } + + s.rotate_seq_id = (my_idx + 1) % s.num_encoder; encoder_segment = s.rotate_segment; if (lh) { lh_close(lh); @@ -319,28 +297,25 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { pthread_mutex_lock(&s.rotate_lock); s.should_close = s.should_close == s.num_encoder ? 1 - s.num_encoder : s.should_close + 1; - - encoder_close(&encoder); - encoder_open(&encoder, encoder.next_path); - encoder.segment = encoder.next_segment; - encoder.rotating = false; - if (has_encoder_alt) { - encoder_close(&encoder_alt); - encoder_open(&encoder_alt, encoder_alt.next_path); - encoder_alt.segment = encoder_alt.next_segment; - encoder_alt.rotating = false; + + for (auto &e : encoders) { + encoder_close(e); + encoder_open(e, e->next_path); + e->segment = e->next_segment; + e->rotating = false; } + s.finish_close += 1; pthread_mutex_unlock(&s.rotate_lock); while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) { usleep(1000); } s.finish_close = 0; - rotate_state->finish_rotate(); + rotate_state.finish_rotate(); } } - rotate_state->setStreamFrameId(extra.frame_id); + rotate_state.setStreamFrameId(extra.frame_id); uint8_t *y = (uint8_t*)buf->addr; uint8_t *u = y + (buf_info.width*buf_info.height); @@ -348,14 +323,12 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { { // encode hevc int out_segment = -1; - int out_id = encoder_encode_frame(&encoder, - y, u, v, + int out_id = encoder_encode_frame(encoders[0], y, u, v, buf_info.width, buf_info.height, &out_segment, &extra); - if (has_encoder_alt) { + if (encoders.size() > 1) { int out_segment_alt = -1; - encoder_encode_frame(&encoder_alt, - y, u, v, + encoder_encode_frame(encoders[1], y, u, v, buf_info.width, buf_info.height, &out_segment_alt, &extra); } @@ -378,8 +351,8 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { eidx.setSegmentNum(out_segment); eidx.setSegmentId(out_id); - auto bytes = msg.toBytes(); if (lh) { + auto bytes = msg.toBytes(); lh_log(lh, bytes.begin(), bytes.size(), false); } } @@ -395,16 +368,11 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) { visionstream_destroy(&stream); } - if (encoder_inited) { - LOG("encoder destroy"); - encoder_close(&encoder); - encoder_destroy(&encoder); - } - - if (has_encoder_alt) { - LOG("encoder alt destroy"); - encoder_close(&encoder_alt); - encoder_destroy(&encoder_alt); + LOG("encoder destroy"); + for(auto &e : encoders) { + encoder_close(e); + encoder_destroy(e); + delete e; } } #endif @@ -576,18 +544,17 @@ int main(int argc, char** argv) { signal(SIGINT, (sighandler_t)set_do_exit); signal(SIGTERM, (sighandler_t)set_do_exit); - s.ctx = Context::create(); - Poller * poller = Poller::create(); - - // subscribe to all services - - std::vector socks; typedef struct QlogState { int counter, freq; } QlogState; std::map qlog_states; + // setup messaging + std::vector socks; + + s.ctx = Context::create(); + Poller * poller = Poller::create(); for (const auto& it : services) { std::string name = it.name; @@ -605,32 +572,31 @@ int main(int argc, char** argv) { } } + // init logger { auto words = gen_init_data(); auto bytes = words.asBytes(); logger_init(&s.logger, "rlog", bytes.begin(), bytes.size(), true); } + // init encoders s.rotate_seq_id = 0; s.should_close = 0; s.finish_close = 0; s.num_encoder = 0; pthread_mutex_init(&s.rotate_lock, NULL); + + std::vector encoder_threads; #ifndef DISABLE_ENCODER - // rear camera - std::thread encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_FCAMERA], LOG_CAMERA_ID_FCAMERA); - s.rotate_state[LOG_CAMERA_ID_FCAMERA].enabled = true; - // front camera - std::thread front_encoder_thread_handle; + encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_FCAMERA)); + if (record_front) { - front_encoder_thread_handle = std::thread(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_DCAMERA], LOG_CAMERA_ID_DCAMERA); - s.rotate_state[LOG_CAMERA_ID_DCAMERA].enabled = true; + encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_DCAMERA)); } - #ifdef QCOM2 - // wide camera - std::thread wide_encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_ECAMERA], LOG_CAMERA_ID_ECAMERA); - s.rotate_state[LOG_CAMERA_ID_ECAMERA].enabled = true; - #endif + +#ifdef QCOM2 + encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_ECAMERA)); +#endif #endif uint64_t msg_count = 0; @@ -641,11 +607,11 @@ int main(int argc, char** argv) { double last_rotate_tms = millis_since_boot(); double last_camera_seen_tms = millis_since_boot(); while (!do_exit) { - for (auto sock : poller->poll(100 * 1000)) { - Message * last_msg = nullptr; - while (true) { + for (auto sock : poller->poll(100 * 1000)) { + Message * last_msg = nullptr; + while (!do_exit) { Message * msg = sock->receive(true); - if (msg == NULL){ + if (!msg){ break; } delete last_msg; @@ -739,28 +705,17 @@ int main(int argc, char** argv) { } } - LOGW("joining threads"); - for (int cid=0;cid<=MAX_CAM_IDX;cid++) { s.rotate_state[cid].cancelWait(); } - -#ifndef DISABLE_ENCODER -#ifdef QCOM2 - wide_encoder_thread_handle.join(); -#endif - if (record_front) { - front_encoder_thread_handle.join(); - } - encoder_thread_handle.join(); - LOGW("encoder joined"); -#endif + LOGW("closing encoders"); + for (auto &r : s.rotate_state) r.cancelWait(); + for (auto &t : encoder_threads) t.join(); + LOGW("closing logger"); logger_close(&s.logger); - LOGW("logger closed"); - - for (auto s : socks){ - delete s; - } + // messaging cleanup + for (auto sock : socks) delete sock; delete poller; delete s.ctx; + return 0; }