@ -62,6 +62,7 @@ struct RemoteEncoder {
bool recording = false ;
bool recording = false ;
bool marked_ready_to_rotate = false ;
bool marked_ready_to_rotate = false ;
bool seen_first_packet = false ;
bool seen_first_packet = false ;
bool audio_initialized = false ;
} ;
} ;
size_t write_encode_data ( LoggerdState * s , cereal : : Event : : Reader event , RemoteEncoder & re , const EncoderInfo & encoder_info ) {
size_t write_encode_data ( LoggerdState * s , cereal : : Event : : Reader event , RemoteEncoder & re , const EncoderInfo & encoder_info ) {
@ -78,12 +79,7 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc
LOGW ( " %s: dropped %d non iframe packets before init " , encoder_info . publish_name , re . dropped_frames ) ;
LOGW ( " %s: dropped %d non iframe packets before init " , encoder_info . publish_name , re . dropped_frames ) ;
re . dropped_frames = 0 ;
re . dropped_frames = 0 ;
}
}
// if we aren't actually recording, don't create the writer
if ( encoder_info . record ) {
if ( encoder_info . record ) {
assert ( encoder_info . filename ! = NULL ) ;
re . writer . reset ( new VideoWriter ( s - > logger . segmentPath ( ) . c_str ( ) ,
encoder_info . filename , idx . getType ( ) ! = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ,
edata . getWidth ( ) , edata . getHeight ( ) , encoder_info . fps , idx . getType ( ) ) ) ;
// write the header
// write the header
auto header = edata . getHeader ( ) ;
auto header = edata . getHeader ( ) ;
re . writer - > write ( ( uint8_t * ) header . begin ( ) , header . size ( ) , idx . getTimestampEof ( ) / 1000 , true , false ) ;
re . writer - > write ( ( uint8_t * ) header . begin ( ) , header . size ( ) , idx . getTimestampEof ( ) / 1000 , true , false ) ;
@ -138,12 +134,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if ( re . current_segment ! = s - > logger . segment ( ) ) {
if ( re . current_segment ! = s - > logger . segment ( ) ) {
if ( re . recording ) {
// if we aren't actually recording, don't create the writer
re . writer . reset ( ) ;
if ( encoder_info . record ) {
assert ( encoder_info . filename ! = NULL ) ;
re . writer . reset ( new VideoWriter ( s - > logger . segmentPath ( ) . c_str ( ) ,
encoder_info . filename , idx . getType ( ) ! = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ,
edata . getWidth ( ) , edata . getHeight ( ) , encoder_info . fps , idx . getType ( ) ) ) ;
re . recording = false ;
re . recording = false ;
re . audio_initialized = false ;
}
}
re . current_segment = s - > logger . segment ( ) ;
re . current_segment = s - > logger . segment ( ) ;
re . marked_ready_to_rotate = false ;
re . marked_ready_to_rotate = false ;
}
if ( re . audio_initialized | | ! encoder_info . include_audio ) {
// we are in this segment now, process any queued messages before this one
// we are in this segment now, process any queued messages before this one
if ( ! re . q . empty ( ) ) {
if ( ! re . q . empty ( ) ) {
for ( auto qmsg : re . q ) {
for ( auto qmsg : re . q ) {
@ -153,9 +156,14 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
}
}
re . q . clear ( ) ;
re . q . clear ( ) ;
}
}
}
bytes_count + = write_encode_data ( s , event , re , encoder_info ) ;
bytes_count + = write_encode_data ( s , event , re , encoder_info ) ;
delete msg ;
delete msg ;
} else if ( re . q . size ( ) > MAIN_FPS * 10 ) {
LOGE_100 ( " %s: dropping frame waiting for audio initialization, queue is too large " , name . c_str ( ) ) ;
delete msg ;
} else {
re . q . push_back ( msg ) ; // queue up all the new segment messages, they go in after audio is initialized
}
} else if ( offset_segment_num > s - > logger . segment ( ) ) {
} else if ( offset_segment_num > s - > logger . segment ( ) ) {
// encoderd packet has a newer segment, this means encoderd has rolled over
// encoderd packet has a newer segment, this means encoderd has rolled over
if ( ! re . marked_ready_to_rotate ) {
if ( ! re . marked_ready_to_rotate ) {
@ -214,7 +222,7 @@ void loggerd_thread() {
typedef struct ServiceState {
typedef struct ServiceState {
std : : string name ;
std : : string name ;
int counter , freq ;
int counter , freq ;
bool encoder , user_flag ;
bool encoder , user_flag , record_audio ;
} ServiceState ;
} ServiceState ;
std : : unordered_map < SubSocket * , ServiceState > service_state ;
std : : unordered_map < SubSocket * , ServiceState > service_state ;
std : : unordered_map < SubSocket * , struct RemoteEncoder > remote_encoders ;
std : : unordered_map < SubSocket * , struct RemoteEncoder > remote_encoders ;
@ -239,6 +247,7 @@ void loggerd_thread() {
. freq = it . decimation ,
. freq = it . decimation ,
. encoder = encoder ,
. encoder = encoder ,
. user_flag = it . name = = " userFlag " ,
. user_flag = it . name = = " userFlag " ,
. record_audio = record_audio ,
} ;
} ;
}
}
}
}
@ -249,6 +258,7 @@ void loggerd_thread() {
Params ( ) . put ( " CurrentRoute " , s . logger . routeName ( ) ) ;
Params ( ) . put ( " CurrentRoute " , s . logger . routeName ( ) ) ;
std : : map < std : : string , EncoderInfo > encoder_infos_dict ;
std : : map < std : : string , EncoderInfo > encoder_infos_dict ;
std : : vector < RemoteEncoder * > encoders_with_audio ;
for ( const auto & cam : cameras_logged ) {
for ( const auto & cam : cameras_logged ) {
for ( const auto & encoder_info : cam . encoder_infos ) {
for ( const auto & encoder_info : cam . encoder_infos ) {
encoder_infos_dict [ encoder_info . publish_name ] = encoder_info ;
encoder_infos_dict [ encoder_info . publish_name ] = encoder_info ;
@ -256,6 +266,13 @@ void loggerd_thread() {
}
}
}
}
for ( auto & [ sock , service ] : service_state ) {
auto it = encoder_infos_dict . find ( service . name ) ;
if ( it ! = encoder_infos_dict . end ( ) & & it - > second . include_audio ) {
encoders_with_audio . push_back ( & remote_encoders [ sock ] ) ;
}
}
uint64_t msg_count = 0 , bytes_count = 0 ;
uint64_t msg_count = 0 , bytes_count = 0 ;
double start_ts = millis_since_boot ( ) ;
double start_ts = millis_since_boot ( ) ;
while ( ! do_exit ) {
while ( ! do_exit ) {
@ -273,6 +290,20 @@ void loggerd_thread() {
Message * msg = nullptr ;
Message * msg = nullptr ;
while ( ! do_exit & & ( msg = sock - > receive ( true ) ) ) {
while ( ! do_exit & & ( msg = sock - > receive ( true ) ) ) {
const bool in_qlog = service . freq ! = - 1 & & ( service . counter + + % service . freq = = 0 ) ;
const bool in_qlog = service . freq ! = - 1 & & ( service . counter + + % service . freq = = 0 ) ;
if ( service . record_audio ) {
capnp : : FlatArrayMessageReader cmsg ( kj : : ArrayPtr < capnp : : word > ( ( capnp : : word * ) msg - > getData ( ) , msg - > getSize ( ) / sizeof ( capnp : : word ) ) ) ;
auto event = cmsg . getRoot < cereal : : Event > ( ) ;
auto audio_data = event . getRawAudioData ( ) . getData ( ) ;
auto sample_rate = event . getRawAudioData ( ) . getSampleRate ( ) ;
for ( auto * encoder : encoders_with_audio ) {
if ( encoder & & encoder - > writer ) {
encoder - > writer - > write_audio ( ( uint8_t * ) audio_data . begin ( ) , audio_data . size ( ) , event . getLogMonoTime ( ) / 1000 , sample_rate ) ;
encoder - > audio_initialized = true ;
}
}
}
if ( service . encoder ) {
if ( service . encoder ) {
s . last_camera_seen_tms = millis_since_boot ( ) ;
s . last_camera_seen_tms = millis_since_boot ( ) ;
bytes_count + = handle_encoder_msg ( & s , msg , service . name , remote_encoders [ sock ] , encoder_infos_dict [ service . name ] ) ;
bytes_count + = handle_encoder_msg ( & s , msg , service . name , remote_encoders [ sock ] , encoder_infos_dict [ service . name ] ) ;