# include <sys/xattr.h>
# include <map>
# include <memory>
# include <string>
# include <unordered_map>
# include <vector>
# include "common/params.h"
# include "system/loggerd/encoder/encoder.h"
# include "system/loggerd/loggerd.h"
# include "system/loggerd/video_writer.h"
ExitHandler do_exit ;
struct LoggerdState {
LoggerState logger ;
std : : atomic < double > last_camera_seen_tms ;
std : : atomic < int > ready_to_rotate ; // count of encoders ready to rotate
int max_waiting = 0 ;
double last_rotate_tms = 0. ; // last rotate time in ms
} ;
void logger_rotate ( LoggerdState * s ) {
bool ret = s - > logger . next ( ) ;
assert ( ret ) ;
s - > ready_to_rotate = 0 ;
s - > last_rotate_tms = millis_since_boot ( ) ;
LOGW ( ( s - > logger . segment ( ) = = 0 ) ? " logging to %s " : " rotated to %s " , s - > logger . segmentPath ( ) . c_str ( ) ) ;
}
void rotate_if_needed ( LoggerdState * s ) {
// all encoders ready, trigger rotation
bool all_ready = s - > ready_to_rotate = = s - > max_waiting ;
// fallback logic to prevent extremely long segments in the case of camera, encoder, etc. malfunctions
bool timed_out = false ;
double tms = millis_since_boot ( ) ;
double seg_length_secs = ( tms - s - > last_rotate_tms ) / 1000. ;
if ( ( seg_length_secs > SEGMENT_LENGTH ) & & ! LOGGERD_TEST ) {
// TODO: might be nice to put these reasons in the sentinel
if ( ( tms - s - > last_camera_seen_tms ) > NO_CAMERA_PATIENCE ) {
timed_out = true ;
LOGE ( " no camera packets seen. auto rotating " ) ;
} else if ( seg_length_secs > SEGMENT_LENGTH * 1.2 ) {
timed_out = true ;
LOGE ( " segment too long. auto rotating " ) ;
}
}
if ( all_ready | | timed_out ) {
logger_rotate ( s ) ;
}
}
struct RemoteEncoder {
std : : unique_ptr < VideoWriter > writer ;
int encoderd_segment_offset ;
int current_segment = - 1 ;
std : : vector < Message * > q ;
int dropped_frames = 0 ;
bool recording = false ;
bool marked_ready_to_rotate = false ;
bool seen_first_packet = false ;
} ;
int handle_encoder_msg ( LoggerdState * s , Message * msg , std : : string & name , struct RemoteEncoder & re , const EncoderInfo & encoder_info ) {
int bytes_count = 0 ;
// extract the message
capnp : : FlatArrayMessageReader cmsg ( kj : : ArrayPtr < capnp : : word > ( ( capnp : : word * ) msg - > getData ( ) , msg - > getSize ( ) / sizeof ( capnp : : word ) ) ) ;
auto event = cmsg . getRoot < cereal : : Event > ( ) ;
auto edata = ( event . * ( encoder_info . get_encode_data_func ) ) ( ) ;
auto idx = edata . getIdx ( ) ;
auto flags = idx . getFlags ( ) ;
// encoderd can have started long before loggerd
if ( ! re . seen_first_packet ) {
re . seen_first_packet = true ;
re . encoderd_segment_offset = idx . getSegmentNum ( ) ;
LOGD ( " %s: has encoderd offset %d " , name . c_str ( ) , re . encoderd_segment_offset ) ;
}
int offset_segment_num = idx . getSegmentNum ( ) - re . encoderd_segment_offset ;
if ( offset_segment_num = = s - > logger . segment ( ) ) {
// loggerd is now on the segment that matches this packet
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if ( re . current_segment ! = s - > logger . segment ( ) ) {
if ( re . recording ) {
re . writer . reset ( ) ;
re . recording = false ;
}
re . current_segment = s - > logger . segment ( ) ;
re . marked_ready_to_rotate = false ;
// we are in this segment now, process any queued messages before this one
if ( ! re . q . empty ( ) ) {
for ( auto & qmsg : re . q ) {
bytes_count + = handle_encoder_msg ( s , qmsg , name , re , encoder_info ) ;
}
re . q . clear ( ) ;
}
}
// if we aren't recording yet, try to start, since we are in the correct segment
if ( ! re . recording ) {
if ( flags & V4L2_BUF_FLAG_KEYFRAME ) {
// only create on iframe
if ( re . dropped_frames ) {
// this should only happen for the first segment, maybe
LOGW ( " %s: dropped %d non iframe packets before init " , name . c_str ( ) , re . dropped_frames ) ;
re . dropped_frames = 0 ;
}
// if we aren't actually recording, don't create the writer
if ( encoder_info . record ) {
assert ( encoder_info . filename ! = NULL ) ;
re . writer . reset ( new VideoWriter ( s - > logger . segmentPath ( ) . c_str ( ) ,
encoder_info . filename , idx . getType ( ) ! = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ,
edata . getWidth ( ) , edata . getHeight ( ) , encoder_info . fps , idx . getType ( ) ) ) ;
// write the header
auto header = edata . getHeader ( ) ;
re . writer - > write ( ( uint8_t * ) header . begin ( ) , header . size ( ) , idx . getTimestampEof ( ) / 1000 , true , false ) ;
}
re . recording = true ;
} else {
// this is a sad case when we aren't recording, but don't have an iframe
// nothing we can do but drop the frame
delete msg ;
+ + re . dropped_frames ;
return bytes_count ;
}
}
// we have to be recording if we are here
assert ( re . recording ) ;
// if we are actually writing the video file, do so
if ( re . writer ) {
auto data = edata . getData ( ) ;
re . writer - > write ( ( uint8_t * ) data . begin ( ) , data . size ( ) , idx . getTimestampEof ( ) / 1000 , false , flags & V4L2_BUF_FLAG_KEYFRAME ) ;
}
// put it in log stream as the idx packet
MessageBuilder bmsg ;
auto evt = bmsg . initEvent ( event . getValid ( ) ) ;
evt . setLogMonoTime ( event . getLogMonoTime ( ) ) ;
( evt . * ( encoder_info . set_encode_idx_func ) ) ( idx ) ;
auto new_msg = bmsg . toBytes ( ) ;
s - > logger . write ( ( uint8_t * ) new_msg . begin ( ) , new_msg . size ( ) , true ) ; // always in qlog?
bytes_count + = new_msg . size ( ) ;
// free the message, we used it
delete msg ;
} else if ( offset_segment_num > s - > logger . segment ( ) ) {
// encoderd packet has a newer segment, this means encoderd has rolled over
if ( ! re . marked_ready_to_rotate ) {
re . marked_ready_to_rotate = true ;
+ + s - > ready_to_rotate ;
LOGD ( " rotate %d -> %d ready %d/%d for %s " ,
s - > logger . segment ( ) , offset_segment_num ,
s - > ready_to_rotate . load ( ) , s - > max_waiting , name . c_str ( ) ) ;
}
// queue up all the new segment messages, they go in after the rotate
re . q . push_back ( msg ) ;
} else {
LOGE ( " %s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d " ,
name . c_str ( ) , idx . getSegmentNum ( ) , s - > logger . segment ( ) , re . encoderd_segment_offset ) ;
// free the message, it's useless. this should never happen
// actually, this can happen if you restart encoderd
re . encoderd_segment_offset = - s - > logger . segment ( ) ;
delete msg ;
}
return bytes_count ;
}
void handle_user_flag ( LoggerdState * s ) {
static int prev_segment = - 1 ;
if ( s - > logger . segment ( ) = = prev_segment ) return ;
LOGW ( " preserving %s " , s - > logger . segmentPath ( ) . c_str ( ) ) ;
# ifdef __APPLE__
int ret = setxattr ( s - > logger . segmentPath ( ) . c_str ( ) , PRESERVE_ATTR_NAME , & PRESERVE_ATTR_VALUE , 1 , 0 , 0 ) ;
# else
int ret = setxattr ( s - > logger . segmentPath ( ) . c_str ( ) , PRESERVE_ATTR_NAME , & PRESERVE_ATTR_VALUE , 1 , 0 ) ;
# endif
if ( ret ) {
LOGE ( " setxattr %s failed for %s: %s " , PRESERVE_ATTR_NAME , s - > logger . segmentPath ( ) . c_str ( ) , strerror ( errno ) ) ;
}
// mark route for uploading
Params params ;
std : : string routes = Params ( ) . get ( " AthenadRecentlyViewedRoutes " ) ;
params . put ( " AthenadRecentlyViewedRoutes " , routes + " , " + s - > logger . routeName ( ) ) ;
prev_segment = s - > logger . segment ( ) ;
}
void loggerd_thread ( ) {
// setup messaging
typedef struct ServiceState {
std : : string name ;
int counter , freq ;
bool encoder , user_flag ;
} ServiceState ;
std : : unordered_map < SubSocket * , ServiceState > service_state ;
loggerd: switch to v4l encoder try 2 (#24380)
* start v4l encoder
* v4l encoder starts
* start and stop
* fill in proper controls
* it dequeued a buffer
* getting bytes
* it made a video
* it does make files
* getting close
* ahh, so that's how dequeue works
* qcam works (no remuxing)
* remuxing works
* we just need to make shutdown and rollover graceful
* graceful destruction
* switch to polling
* should work now
* fix pc build
* refactors, stop properly
* touchups, remove a copy
* add v4l encoder to release
* inlcude file
* move writing to it's own thread
* fix minor memory leak
* block instead of dropping frames
* add counter, fix tests maybe
* better debugging and test print
* print file path in assert
* format string in test
* no more oversized qlogs
* match qcam
* touchups, remove omx encoder
* remove omx include files
* checked ioctl, better debugging, open by name
* unused import
* move linux includes to third_party/linux/include
* simple encoderd
* full packet
* encoderd should be complete
* lagging print
* updates
* name dq thread
* subset idx
* video file writing works
* debug
* potential bugfix
* rotation works
* iframe
* keep writing support
* ci should pass
* loggerd, not encoderd
* remote encoder code
* support remote encoder
* cereal to master, add encoderd
* header no longer required
* put that back there
* realtime
* lower decoder latency
* don't use queue for VisionIpcBufExtra, disable realtime again
* assert all written
* hmm simpler
* only push to to_write if we are writing
* assert timestamp is right
* use at and remove assert
* revert to queue
Co-authored-by: Comma Device <device@comma.ai>
3 years ago
std : : unordered_map < SubSocket * , struct RemoteEncoder > remote_encoders ;
std : : unique_ptr < Context > ctx ( Context : : create ( ) ) ;
std : : unique_ptr < Poller > poller ( Poller : : create ( ) ) ;
// subscribe to all socks
for ( const auto & [ _ , it ] : services ) {
const bool encoder = util : : ends_with ( it . name , " EncodeData " ) ;
const bool livestream_encoder = util : : starts_with ( it . name , " livestream " ) ;
if ( ! it . should_log & & ( ! encoder | | livestream_encoder ) ) continue ;
LOGD ( " logging %s (on port %d) " , it . name . c_str ( ) , it . port ) ;
SubSocket * sock = SubSocket : : create ( ctx . get ( ) , it . name ) ;
assert ( sock ! = NULL ) ;
poller - > registerSocket ( sock ) ;
service_state [ sock ] = {
. name = it . name ,
. counter = 0 ,
. freq = it . decimation ,
loggerd: switch to v4l encoder try 2 (#24380)
* start v4l encoder
* v4l encoder starts
* start and stop
* fill in proper controls
* it dequeued a buffer
* getting bytes
* it made a video
* it does make files
* getting close
* ahh, so that's how dequeue works
* qcam works (no remuxing)
* remuxing works
* we just need to make shutdown and rollover graceful
* graceful destruction
* switch to polling
* should work now
* fix pc build
* refactors, stop properly
* touchups, remove a copy
* add v4l encoder to release
* inlcude file
* move writing to it's own thread
* fix minor memory leak
* block instead of dropping frames
* add counter, fix tests maybe
* better debugging and test print
* print file path in assert
* format string in test
* no more oversized qlogs
* match qcam
* touchups, remove omx encoder
* remove omx include files
* checked ioctl, better debugging, open by name
* unused import
* move linux includes to third_party/linux/include
* simple encoderd
* full packet
* encoderd should be complete
* lagging print
* updates
* name dq thread
* subset idx
* video file writing works
* debug
* potential bugfix
* rotation works
* iframe
* keep writing support
* ci should pass
* loggerd, not encoderd
* remote encoder code
* support remote encoder
* cereal to master, add encoderd
* header no longer required
* put that back there
* realtime
* lower decoder latency
* don't use queue for VisionIpcBufExtra, disable realtime again
* assert all written
* hmm simpler
* only push to to_write if we are writing
* assert timestamp is right
* use at and remove assert
* revert to queue
Co-authored-by: Comma Device <device@comma.ai>
3 years ago
. encoder = encoder ,
. user_flag = it . name = = " userFlag " ,
} ;
}
LoggerdState s ;
// init logger
logger_rotate ( & s ) ;
Params ( ) . put ( " CurrentRoute " , s . logger . routeName ( ) ) ;
std : : map < std : : string , EncoderInfo > encoder_infos_dict ;
for ( const auto & cam : cameras_logged ) {
for ( const auto & encoder_info : cam . encoder_infos ) {
encoder_infos_dict [ encoder_info . publish_name ] = encoder_info ;
s . max_waiting + + ;
}
}
uint64_t msg_count = 0 , bytes_count = 0 ;
double start_ts = millis_since_boot ( ) ;
while ( ! do_exit ) {
// poll for new messages on all sockets
for ( auto sock : poller - > poll ( 1000 ) ) {
if ( do_exit ) break ;
ServiceState & service = service_state [ sock ] ;
if ( service . user_flag ) {
handle_user_flag ( & s ) ;
}
// drain socket
int count = 0 ;
Message * msg = nullptr ;
while ( ! do_exit & & ( msg = sock - > receive ( true ) ) ) {
const bool in_qlog = service . freq ! = - 1 & & ( service . counter + + % service . freq = = 0 ) ;
if ( service . encoder ) {
s . last_camera_seen_tms = millis_since_boot ( ) ;
bytes_count + = handle_encoder_msg ( & s , msg , service . name , remote_encoders [ sock ] , encoder_infos_dict [ service . name ] ) ;
loggerd: switch to v4l encoder try 2 (#24380)
* start v4l encoder
* v4l encoder starts
* start and stop
* fill in proper controls
* it dequeued a buffer
* getting bytes
* it made a video
* it does make files
* getting close
* ahh, so that's how dequeue works
* qcam works (no remuxing)
* remuxing works
* we just need to make shutdown and rollover graceful
* graceful destruction
* switch to polling
* should work now
* fix pc build
* refactors, stop properly
* touchups, remove a copy
* add v4l encoder to release
* inlcude file
* move writing to it's own thread
* fix minor memory leak
* block instead of dropping frames
* add counter, fix tests maybe
* better debugging and test print
* print file path in assert
* format string in test
* no more oversized qlogs
* match qcam
* touchups, remove omx encoder
* remove omx include files
* checked ioctl, better debugging, open by name
* unused import
* move linux includes to third_party/linux/include
* simple encoderd
* full packet
* encoderd should be complete
* lagging print
* updates
* name dq thread
* subset idx
* video file writing works
* debug
* potential bugfix
* rotation works
* iframe
* keep writing support
* ci should pass
* loggerd, not encoderd
* remote encoder code
* support remote encoder
* cereal to master, add encoderd
* header no longer required
* put that back there
* realtime
* lower decoder latency
* don't use queue for VisionIpcBufExtra, disable realtime again
* assert all written
* hmm simpler
* only push to to_write if we are writing
* assert timestamp is right
* use at and remove assert
* revert to queue
Co-authored-by: Comma Device <device@comma.ai>
3 years ago
} else {
s . logger . write ( ( uint8_t * ) msg - > getData ( ) , msg - > getSize ( ) , in_qlog ) ;
loggerd: switch to v4l encoder try 2 (#24380)
* start v4l encoder
* v4l encoder starts
* start and stop
* fill in proper controls
* it dequeued a buffer
* getting bytes
* it made a video
* it does make files
* getting close
* ahh, so that's how dequeue works
* qcam works (no remuxing)
* remuxing works
* we just need to make shutdown and rollover graceful
* graceful destruction
* switch to polling
* should work now
* fix pc build
* refactors, stop properly
* touchups, remove a copy
* add v4l encoder to release
* inlcude file
* move writing to it's own thread
* fix minor memory leak
* block instead of dropping frames
* add counter, fix tests maybe
* better debugging and test print
* print file path in assert
* format string in test
* no more oversized qlogs
* match qcam
* touchups, remove omx encoder
* remove omx include files
* checked ioctl, better debugging, open by name
* unused import
* move linux includes to third_party/linux/include
* simple encoderd
* full packet
* encoderd should be complete
* lagging print
* updates
* name dq thread
* subset idx
* video file writing works
* debug
* potential bugfix
* rotation works
* iframe
* keep writing support
* ci should pass
* loggerd, not encoderd
* remote encoder code
* support remote encoder
* cereal to master, add encoderd
* header no longer required
* put that back there
* realtime
* lower decoder latency
* don't use queue for VisionIpcBufExtra, disable realtime again
* assert all written
* hmm simpler
* only push to to_write if we are writing
* assert timestamp is right
* use at and remove assert
* revert to queue
Co-authored-by: Comma Device <device@comma.ai>
3 years ago
bytes_count + = msg - > getSize ( ) ;
delete msg ;
}
rotate_if_needed ( & s ) ;
if ( ( + + msg_count % 1000 ) = = 0 ) {
double seconds = ( millis_since_boot ( ) - start_ts ) / 1000.0 ;
LOGD ( " % " PRIu64 " messages, %.2f msg/sec, %.2f KB/sec " , msg_count , msg_count / seconds , bytes_count * 0.001 / seconds ) ;
}
count + + ;
if ( count > = 200 ) {
LOGD ( " large volume of '%s' messages " , service . name . c_str ( ) ) ;
break ;
}
}
}
}
LOGW ( " closing logger " ) ;
s . logger . setExitSignal ( do_exit . signal ) ;
if ( do_exit . power_failure ) {
LOGE ( " power failure " ) ;
sync ( ) ;
LOGE ( " sync done " ) ;
}
// messaging cleanup
for ( auto & [ sock , service ] : service_state ) delete sock ;
}
int main ( int argc , char * * argv ) {
if ( ! Hardware : : PC ( ) ) {
int ret ;
ret = util : : set_core_affinity ( { 0 , 1 , 2 , 3 } ) ;
assert ( ret = = 0 ) ;
// TODO: why does this impact camerad timings?
//ret = util::set_realtime_priority(1);
//assert(ret == 0);
}
loggerd_thread ( ) ;
return 0 ;
}