diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index 898216e5b6..144ab9f349 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -62,6 +62,7 @@ struct RemoteEncoder { bool recording = false; bool marked_ready_to_rotate = false; bool seen_first_packet = false; + bool audio_initialized = false; }; size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) { @@ -78,12 +79,7 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc LOGW("%s: dropped %d non iframe packets before init", encoder_info.publish_name, re.dropped_frames); re.dropped_frames = 0; } - // if we aren't actually recording, don't create the writer if (encoder_info.record) { - assert(encoder_info.filename != NULL); - re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), - encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, - edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType())); // write the header auto header = edata.getHeader(); re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof() / 1000, true, false); @@ -138,12 +134,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets if (re.current_segment != s->logger.segment()) { - if (re.recording) { - re.writer.reset(); + // if we aren't actually recording, don't create the writer + if (encoder_info.record) { + assert(encoder_info.filename != NULL); + re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), + encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, + edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType())); re.recording = false; + re.audio_initialized = false; } re.current_segment = s->logger.segment(); re.marked_ready_to_rotate = false; + } + if (re.audio_initialized || !encoder_info.include_audio) { // we are in this segment now, process any queued messages before this one if (!re.q.empty()) { for (auto qmsg : re.q) { @@ -153,9 +156,14 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct } re.q.clear(); } + bytes_count += write_encode_data(s, event, re, encoder_info); + delete msg; + } else if (re.q.size() > MAIN_FPS*10) { + LOGE_100("%s: dropping frame waiting for audio initialization, queue is too large", name.c_str()); + delete msg; + } else { + re.q.push_back(msg); // queue up all the new segment messages, they go in after audio is initialized } - bytes_count += write_encode_data(s, event, re, encoder_info); - delete msg; } else if (offset_segment_num > s->logger.segment()) { // encoderd packet has a newer segment, this means encoderd has rolled over if (!re.marked_ready_to_rotate) { @@ -214,7 +222,7 @@ void loggerd_thread() { typedef struct ServiceState { std::string name; int counter, freq; - bool encoder, user_flag; + bool encoder, user_flag, record_audio; } ServiceState; std::unordered_map service_state; std::unordered_map remote_encoders; @@ -239,6 +247,7 @@ void loggerd_thread() { .freq = it.decimation, .encoder = encoder, .user_flag = it.name == "userFlag", + .record_audio = record_audio, }; } } @@ -249,6 +258,7 @@ void loggerd_thread() { Params().put("CurrentRoute", s.logger.routeName()); std::map encoder_infos_dict; + std::vector encoders_with_audio; for (const auto &cam : cameras_logged) { for (const auto &encoder_info : cam.encoder_infos) { encoder_infos_dict[encoder_info.publish_name] = encoder_info; @@ -256,6 +266,13 @@ void loggerd_thread() { } } + for (auto &[sock, service] : service_state) { + auto it = encoder_infos_dict.find(service.name); + if (it != encoder_infos_dict.end() && it->second.include_audio) { + encoders_with_audio.push_back(&remote_encoders[sock]); + } + } + uint64_t msg_count = 0, bytes_count = 0; double start_ts = millis_since_boot(); while (!do_exit) { @@ -273,6 +290,20 @@ void loggerd_thread() { Message *msg = nullptr; while (!do_exit && (msg = sock->receive(true))) { const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0); + + if (service.record_audio) { + capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); + auto event = cmsg.getRoot(); + auto audio_data = event.getRawAudioData().getData(); + auto sample_rate = event.getRawAudioData().getSampleRate(); + for (auto* encoder : encoders_with_audio) { + if (encoder && encoder->writer) { + encoder->writer->write_audio((uint8_t*)audio_data.begin(), audio_data.size(), event.getLogMonoTime() / 1000, sample_rate); + encoder->audio_initialized = true; + } + } + } + if (service.encoder) { s.last_camera_seen_tms = millis_since_boot(); bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]); diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 27d2d37fc4..5dfb178fd5 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -35,6 +35,7 @@ public: const char *thumbnail_name = NULL; const char *filename = NULL; bool record = true; + bool include_audio = false; int frame_width = -1; int frame_height = -1; int fps = MAIN_FPS; @@ -106,6 +107,7 @@ const EncoderInfo qcam_encoder_info = { .encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .frame_width = 526, .frame_height = 330, + .include_audio = Params().getBool("RecordAudio"), INIT_ENCODE_FUNCTIONS(QRoadEncode), }; diff --git a/system/loggerd/video_writer.cc b/system/loggerd/video_writer.cc index 90b5f1af3d..68e870982f 100644 --- a/system/loggerd/video_writer.cc +++ b/system/loggerd/video_writer.cc @@ -50,6 +50,45 @@ VideoWriter::VideoWriter(const char *path, const char *filename, bool remuxing, } } +void VideoWriter::initialize_audio(int sample_rate) { + assert(this->ofmt_ctx->oformat->audio_codec != AV_CODEC_ID_NONE); // check output format supports audio streams + const AVCodec *audio_avcodec = avcodec_find_encoder(AV_CODEC_ID_AAC); + assert(audio_avcodec); + this->audio_codec_ctx = avcodec_alloc_context3(audio_avcodec); + assert(this->audio_codec_ctx); + this->audio_codec_ctx->sample_fmt = AV_SAMPLE_FMT_FLTP; + this->audio_codec_ctx->sample_rate = sample_rate; + #if LIBAVUTIL_VERSION_INT >= AV_VERSION_INT(57, 28, 100) // FFmpeg 5.1+ + av_channel_layout_default(&this->audio_codec_ctx->ch_layout, 1); + #else + this->audio_codec_ctx->channel_layout = AV_CH_LAYOUT_MONO; + #endif + this->audio_codec_ctx->bit_rate = 32000; + this->audio_codec_ctx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + this->audio_codec_ctx->time_base = (AVRational){1, audio_codec_ctx->sample_rate}; + int err = avcodec_open2(this->audio_codec_ctx, audio_avcodec, NULL); + assert(err >= 0); + av_log_set_level(AV_LOG_WARNING); // hide "QAvg" info msgs at the end of every segment + + this->audio_stream = avformat_new_stream(this->ofmt_ctx, NULL); + assert(this->audio_stream); + err = avcodec_parameters_from_context(this->audio_stream->codecpar, this->audio_codec_ctx); + assert(err >= 0); + + this->audio_frame = av_frame_alloc(); + assert(this->audio_frame); + this->audio_frame->format = this->audio_codec_ctx->sample_fmt; + #if LIBAVUTIL_VERSION_INT >= AV_VERSION_INT(57, 28, 100) // FFmpeg 5.1+ + av_channel_layout_copy(&this->audio_frame->ch_layout, &this->audio_codec_ctx->ch_layout); + #else + this->audio_frame->channel_layout = this->audio_codec_ctx->channel_layout; + #endif + this->audio_frame->sample_rate = this->audio_codec_ctx->sample_rate; + this->audio_frame->nb_samples = this->audio_codec_ctx->frame_size; + err = av_frame_get_buffer(this->audio_frame, 0); + assert(err >= 0); +} + void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe) { if (of && data) { size_t written = util::safe_fwrite(data, 1, len, of); @@ -67,8 +106,10 @@ void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecc } int err = avcodec_parameters_from_context(out_stream->codecpar, codec_ctx); assert(err >= 0); + // if there is an audio stream, it must be initialized before this point err = avformat_write_header(ofmt_ctx, NULL); assert(err >= 0); + header_written = true; } else { // input timestamps are in microseconds AVRational in_timebase = {1, 1000000}; @@ -77,6 +118,7 @@ void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecc av_init_packet(&pkt); pkt.data = data; pkt.size = len; + pkt.stream_index = this->out_stream->index; enum AVRounding rnd = static_cast(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); pkt.pts = pkt.dts = av_rescale_q_rnd(timestamp, in_timebase, ofmt_ctx->streams[0]->time_base, rnd); @@ -95,11 +137,80 @@ void VideoWriter::write(uint8_t *data, int len, long long timestamp, bool codecc } } +void VideoWriter::write_audio(uint8_t *data, int len, long long timestamp, int sample_rate) { + if (!remuxing) return; + if (!audio_initialized) { + initialize_audio(sample_rate); + audio_initialized = true; + } + if (!audio_codec_ctx) return; + // sync logMonoTime of first audio packet with the timestampEof of first video packet + if (audio_pts == 0) { + audio_pts = (timestamp * audio_codec_ctx->sample_rate) / 1000000ULL; + } + + // convert s16le samples to fltp and add to buffer + const int16_t *raw_samples = reinterpret_cast(data); + int sample_count = len / sizeof(int16_t); + constexpr float normalizer = 1.0f / 32768.0f; + + const size_t max_buffer_size = sample_rate * 10; // 10 seconds + if (audio_buffer.size() + sample_count > max_buffer_size) { + size_t samples_to_drop = (audio_buffer.size() + sample_count) - max_buffer_size; + LOGE("Audio buffer overflow, dropping %zu oldest samples", samples_to_drop); + audio_buffer.erase(audio_buffer.begin(), audio_buffer.begin() + samples_to_drop); + audio_pts += samples_to_drop; + } + + // Add new samples to the buffer + const size_t original_size = audio_buffer.size(); + audio_buffer.resize(original_size + sample_count); + std::transform(raw_samples, raw_samples + sample_count, audio_buffer.begin() + original_size, + [](int16_t sample) { return sample * normalizer; }); + + if (!header_written) return; // header not written yet, process audio frame after header is written + while (audio_buffer.size() >= audio_codec_ctx->frame_size) { + audio_frame->pts = audio_pts; + float *f_samples = reinterpret_cast(audio_frame->data[0]); + std::copy(audio_buffer.begin(), audio_buffer.begin() + audio_codec_ctx->frame_size, f_samples); + audio_buffer.erase(audio_buffer.begin(), audio_buffer.begin() + audio_codec_ctx->frame_size); + encode_and_write_audio_frame(audio_frame); + } +} + +void VideoWriter::encode_and_write_audio_frame(AVFrame* frame) { + if (!remuxing || !audio_codec_ctx) return; + int send_result = avcodec_send_frame(audio_codec_ctx, frame); // encode frame + if (send_result >= 0) { + AVPacket *pkt = av_packet_alloc(); + while (avcodec_receive_packet(audio_codec_ctx, pkt) == 0) { + av_packet_rescale_ts(pkt, audio_codec_ctx->time_base, audio_stream->time_base); + pkt->stream_index = audio_stream->index; + + int err = av_interleaved_write_frame(ofmt_ctx, pkt); // write encoded frame + if (err < 0) { + LOGW("AUDIO: Write frame failed - error: %d", err); + } + av_packet_unref(pkt); + } + av_packet_free(&pkt); + } else { + LOGW("AUDIO: Failed to send audio frame to encoder: %d", send_result); + } + audio_pts += audio_codec_ctx->frame_size; +} + + VideoWriter::~VideoWriter() { if (this->remuxing) { + if (this->audio_codec_ctx) { + encode_and_write_audio_frame(NULL); // flush encoder + avcodec_free_context(&this->audio_codec_ctx); + } int err = av_write_trailer(this->ofmt_ctx); if (err != 0) LOGE("av_write_trailer failed %d", err); avcodec_free_context(&this->codec_ctx); + if (this->audio_frame) av_frame_free(&this->audio_frame); err = avio_closep(&this->ofmt_ctx->pb); if (err != 0) LOGE("avio_closep failed %d", err); avformat_free_context(this->ofmt_ctx); diff --git a/system/loggerd/video_writer.h b/system/loggerd/video_writer.h index 1aa758b42b..25e6484d58 100644 --- a/system/loggerd/video_writer.h +++ b/system/loggerd/video_writer.h @@ -1,6 +1,7 @@ #pragma once #include +#include extern "C" { #include @@ -13,13 +14,28 @@ class VideoWriter { public: VideoWriter(const char *path, const char *filename, bool remuxing, int width, int height, int fps, cereal::EncodeIndex::Type codec); void write(uint8_t *data, int len, long long timestamp, bool codecconfig, bool keyframe); + void write_audio(uint8_t *data, int len, long long timestamp, int sample_rate); + ~VideoWriter(); + private: + void initialize_audio(int sample_rate); + void encode_and_write_audio_frame(AVFrame* frame); + std::string vid_path, lock_path; FILE *of = nullptr; AVCodecContext *codec_ctx; AVFormatContext *ofmt_ctx; AVStream *out_stream; + + bool audio_initialized = false; + bool header_written = false; + AVStream *audio_stream = nullptr; + AVCodecContext *audio_codec_ctx = nullptr; + AVFrame *audio_frame = nullptr; + uint64_t audio_pts = 0; + std::deque audio_buffer; + bool remuxing; };