@ -15,6 +15,7 @@
# include <random>
# include <string>
# include <thread>
# include <unordered_map>
# include "cereal/messaging/messaging.h"
# include "cereal/services.h"
@ -59,7 +60,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
. bitrate = MAIN_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = true
. has_qcamera = true ,
. trigger_rotate = true
} ,
[ LOG_CAMERA_ID_DCAMERA ] = {
. stream_type = VISION_STREAM_YUV_FRONT ,
@ -69,7 +71,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
. bitrate = DCAM_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = false
. has_qcamera = false ,
. trigger_rotate = Hardware : : TICI ( ) ,
} ,
[ LOG_CAMERA_ID_ECAMERA ] = {
. stream_type = VISION_STREAM_YUV_WIDE ,
@ -79,7 +82,8 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
. bitrate = MAIN_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = false
. has_qcamera = false ,
. trigger_rotate = true
} ,
[ LOG_CAMERA_ID_QCAMERA ] = {
. filename = " qcamera.ts " ,
@ -92,81 +96,27 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
} ,
} ;
class RotateState {
public :
SubSocket * fpkt_sock ;
uint32_t stream_frame_id , log_frame_id , last_rotate_frame_id ;
bool enabled , should_rotate , initialized ;
std : : atomic < bool > rotating ;
std : : atomic < int > cur_seg ;
RotateState ( ) : fpkt_sock ( nullptr ) , stream_frame_id ( 0 ) , log_frame_id ( 0 ) ,
last_rotate_frame_id ( UINT32_MAX ) , enabled ( false ) , should_rotate ( false ) , initialized ( false ) , rotating ( false ) , cur_seg ( - 1 ) { } ;
void waitLogThread ( ) {
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
while ( stream_frame_id > log_frame_id // if the log camera is older, wait for it to catch up.
& & ( stream_frame_id - log_frame_id ) < 8 // but if its too old then there probably was a discontinuity (visiond restarted)
& & ! do_exit ) {
cv . wait ( lk ) ;
}
}
void cancelWait ( ) {
cv . notify_one ( ) ;
}
void setStreamFrameId ( uint32_t frame_id ) {
fid_lock . lock ( ) ;
stream_frame_id = frame_id ;
fid_lock . unlock ( ) ;
cv . notify_one ( ) ;
}
void setLogFrameId ( uint32_t frame_id ) {
fid_lock . lock ( ) ;
log_frame_id = frame_id ;
fid_lock . unlock ( ) ;
cv . notify_one ( ) ;
}
void rotate ( ) {
if ( enabled ) {
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
should_rotate = true ;
last_rotate_frame_id = stream_frame_id ;
}
}
void finish_rotate ( ) {
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
should_rotate = false ;
}
private :
std : : mutex fid_lock ;
std : : condition_variable cv ;
} ;
struct LoggerdState {
Context * ctx ;
LoggerState logger = { } ;
char segment_path [ 4096 ] ;
int rotate_segment ;
pthread_mutex_t rotate_lock ;
RotateState rotate_state [ LOG_CAMERA_ID_MAX - 1 ] ;
std : : mutex rotate_lock ;
std : : condition_variable rotate_cv ;
std : : atomic < int > rotate_segment ;
std : : atomic < double > last_camera_seen_tms ;
std : : atomic < int > waiting_rotate ;
int max_waiting = 0 ;
double last_rotate_tms = 0. ;
} ;
LoggerdState s ;
void encoder_thread ( int cam_idx ) {
assert ( cam_idx < LOG_CAMERA_ID_MAX - 1 ) ;
LogCameraInfo & cam_info = cameras_logged [ cam_idx ] ;
RotateState & rotate_state = s . rotate_state [ cam_idx ] ;
const LogCameraInfo & cam_info = cameras_logged [ cam_idx ] ;
set_thread_name ( cam_info . filename ) ;
int cnt = 0 ;
int cnt = 0 , cur_seg = - 1 ;
int encode_idx = 0 ;
LoggerHandle * lh = NULL ;
std : : vector < Encoder * > encoders ;
VisionIpcClient vipc_client = VisionIpcClient ( " camerad " , cam_info . stream_type , false ) ;
@ -198,68 +148,47 @@ void encoder_thread(int cam_idx) {
while ( ! do_exit ) {
VisionIpcBufExtra extra ;
VisionBuf * buf = vipc_client . recv ( & extra ) ;
if ( buf = = nullptr ) {
continue ;
}
//printf("logger latency to tsEof: %f\n", (double)(nanos_since_boot() - extra.timestamp_eof) / 1000000.0);
// all the rotation stuff
{
pthread_mutex_lock ( & s . rotate_lock ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
// wait if camera pkt id is older than stream
rotate_state . waitLogThread ( ) ;
if ( do_exit ) break ;
// rotate the encoder if the logger is on a newer segment
if ( rotate_state . should_rotate ) {
LOGW ( " camera %d rotate encoder to %s " , cam_idx , s . segment_path ) ;
if ( buf = = nullptr ) continue ;
if ( ! rotate_state . initialized ) {
rotate_state . last_rotate_frame_id = extra . frame_id - 1 ;
rotate_state . initialized = true ;
if ( cam_info . trigger_rotate ) {
s . last_camera_seen_tms = millis_since_boot ( ) ;
}
// get new logger handle for new segment
if ( lh ) {
lh_close ( lh ) ;
if ( cam_info . trigger_rotate & & ( cnt > = SEGMENT_LENGTH * MAIN_FPS ) ) {
// trigger rotate and wait logger rotated to new segment
+ + s . waiting_rotate ;
std : : unique_lock lk ( s . rotate_lock ) ;
s . rotate_cv . wait ( lk , [ & ] { return s . rotate_segment > cur_seg | | do_exit ; } ) ;
}
lh = logger_get_handle ( & s . logger ) ;
if ( do_exit ) break ;
// wait for all to start rotating
rotate_state . rotating = true ;
for ( auto & r : s . rotate_state ) {
while ( r . enabled & & ! r . rotating & & ! do_exit ) util : : sleep_for ( 5 ) ;
}
// rotate the encoder if the logger is on a newer segment
if ( s . rotate_segment > cur_seg ) {
cur_seg = s . rotate_segment ;
cnt = 0 ;
pthread_mutex_lock ( & s . rotate_lock ) ;
LOGW ( " camera %d rotate encoder to %s " , cam_idx , s . segment_path ) ;
for ( auto & e : encoders ) {
e - > encoder_close ( ) ;
e - > encoder_open ( s . segment_path ) ;
}
rotate_state . cur_seg = s . rotate_segment ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
// wait for all to finish rotating
for ( auto & r : s . rotate_state ) {
while ( r . enabled & & r . cur_seg ! = s . rotate_segment & & ! do_exit ) util : : sleep_for ( 5 ) ;
}
rotate_state . rotating = false ;
rotate_state . finish_rotate ( ) ;
if ( lh ) {
lh_close ( lh ) ;
}
lh = logger_get_handle ( & s . logger ) ;
}
rotate_state . setStreamFrameId ( extra . frame_id ) ;
// encode a frame
for ( int i = 0 ; i < encoders . size ( ) ; + + i ) {
int out_id = encoders [ i ] - > encode_frame ( buf - > y , buf - > u , buf - > v ,
buf - > width , buf - > height , extra . timestamp_eof ) ;
if ( i = = 0 & & out_id ! = - 1 ) {
if ( out_id = = - 1 ) {
LOGE ( " Failed to encode frame. frame_id: %d encode_id: %d " , extra . frame_id , encode_idx ) ;
}
// publish encode index
if ( i = = 0 & & out_id ! = - 1 ) {
MessageBuilder msg ;
// this is really ugly
auto eidx = cam_idx = = LOG_CAMERA_ID_DCAMERA ? msg . initEvent ( ) . initDriverEncodeIdx ( ) :
@ -272,8 +201,8 @@ void encoder_thread(int cam_idx) {
} else {
eidx . setType ( cam_idx = = LOG_CAMERA_ID_DCAMERA ? cereal : : EncodeIndex : : Type : : FRONT : cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) ;
}
eidx . setEncodeId ( cnt ) ;
eidx . setSegmentNum ( rotate_state . cur_seg ) ;
eidx . setEncodeId ( encode_idx ) ;
eidx . setSegmentNum ( cur_seg ) ;
eidx . setSegmentId ( out_id ) ;
if ( lh ) {
// TODO: this should read cereal/services.h for qlog decimation
@ -284,6 +213,7 @@ void encoder_thread(int cam_idx) {
}
cnt + + ;
encode_idx + + ;
}
if ( lh ) {
@ -311,6 +241,33 @@ void clear_locks() {
ftw ( LOG_ROOT . c_str ( ) , clear_locks_fn , 16 ) ;
}
void logger_rotate ( ) {
{
std : : unique_lock lk ( s . rotate_lock ) ;
int segment = - 1 ;
int err = logger_next ( & s . logger , LOG_ROOT . c_str ( ) , s . segment_path , sizeof ( s . segment_path ) , & segment ) ;
assert ( err = = 0 ) ;
s . rotate_segment = segment ;
s . waiting_rotate = 0 ;
s . last_rotate_tms = millis_since_boot ( ) ;
}
s . rotate_cv . notify_all ( ) ;
LOGW ( ( s . logger . part = = 0 ) ? " logging to %s " : " rotated to %s " , s . segment_path ) ;
}
void rotate_if_needed ( ) {
if ( s . waiting_rotate = = s . max_waiting ) {
logger_rotate ( ) ;
}
double tms = millis_since_boot ( ) ;
if ( ( tms - s . last_rotate_tms ) > SEGMENT_LENGTH * 1000 & &
( tms - s . last_camera_seen_tms ) > NO_CAMERA_PATIENCE ) {
LOGW ( " no camera packet seen. auto rotating " ) ;
logger_rotate ( ) ;
}
}
} // namespace
int main ( int argc , char * * argv ) {
@ -322,11 +279,10 @@ int main(int argc, char** argv) {
typedef struct QlogState {
int counter , freq ;
} QlogState ;
std : : map < SubSocket * , QlogState > qlog_states ;
std : : unordered_ map< SubSocket * , QlogState > qlog_states ;
s . ctx = Context : : create ( ) ;
Poller * poller = Poller : : create ( ) ;
std : : vector < SubSocket * > socks ;
// subscribe to all socks
for ( const auto & it : services ) {
@ -335,13 +291,6 @@ int main(int argc, char** argv) {
SubSocket * sock = SubSocket : : create ( s . ctx , it . name ) ;
assert ( sock ! = NULL ) ;
poller - > registerSocket ( sock ) ;
socks . push_back ( sock ) ;
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) {
if ( std : : string ( it . name ) = = cameras_logged [ cid ] . frame_packet_name ) {
s . rotate_state [ cid ] . fpkt_sock = sock ;
}
}
qlog_states [ sock ] = { . counter = 0 , . freq = it . decimation } ;
}
@ -349,124 +298,57 @@ int main(int argc, char** argv) {
// init logger
logger_init ( & s . logger , " rlog " , true ) ;
logger_rotate ( ) ;
params . put ( " CurrentRoute " , s . logger . route_name ) ;
// init encoders
pthread_mutex_init ( & s . rotate_lock , NULL ) ;
s . last_camera_seen_tms = millis_since_boot ( ) ;
// TODO: create these threads dynamically on frame packet presence
std : : vector < std : : thread > encoder_threads ;
encoder_threads . push_back ( std : : thread ( encoder_thread , LOG_CAMERA_ID_FCAMERA ) ) ;
s . rotate_state [ LOG_CAMERA_ID_FCAMERA ] . enabled = true ;
if ( cameras_logged [ LOG_CAMERA_ID_FCAMERA ] . trigger_rotate ) {
s . max_waiting + = 1 ;
}
if ( ! Hardware : : PC ( ) & & params . getBool ( " RecordFront " ) ) {
encoder_threads . push_back ( std : : thread ( encoder_thread , LOG_CAMERA_ID_DCAMERA ) ) ;
s . rotate_state [ LOG_CAMERA_ID_DCAMERA ] . enabled = true ;
if ( cameras_logged [ LOG_CAMERA_ID_DCAMERA ] . trigger_rotate ) {
s . max_waiting + = 1 ;
}
}
if ( Hardware : : TICI ( ) ) {
encoder_threads . push_back ( std : : thread ( encoder_thread , LOG_CAMERA_ID_ECAMERA ) ) ;
s . rotate_state [ LOG_CAMERA_ID_ECAMERA ] . enabled = true ;
if ( cameras_logged [ LOG_CAMERA_ID_ECAMERA ] . trigger_rotate ) {
s . max_waiting + = 1 ;
}
}
uint64_t msg_count = 0 ;
uint64_t bytes_count = 0 ;
AlignedBuffer aligned_buf ;
double start_ts = seconds_since_boot ( ) ;
double last_rotate_tms = millis_since_boot ( ) ;
double last_camera_seen_tms = millis_since_boot ( ) ;
uint64_t msg_count = 0 , bytes_count = 0 ;
double start_ts = millis_since_boot ( ) ;
while ( ! do_exit ) {
// TODO: fix msgs from the first poll getting dropped
// poll for new messages on all sockets
for ( auto sock : poller - > poll ( 1000 ) ) {
// drain socket
Message * last_msg = nullptr ;
while ( ! do_exit ) {
Message * msg = sock - > receive ( true ) ;
if ( ! msg ) {
break ;
}
delete last_msg ;
last_msg = msg ;
QlogState & qs = qlog_states [ sock ] ;
logger_log ( & s . logger , ( uint8_t * ) msg - > getData ( ) , msg - > getSize ( ) , qs . counter = = 0 & & qs . freq ! = - 1 ) ;
if ( qs . freq ! = - 1 ) {
qs . counter = ( qs . counter + 1 ) % qs . freq ;
}
Message * msg = nullptr ;
while ( ! do_exit & & ( msg = sock - > receive ( true ) ) ) {
const bool in_qlog = qs . freq ! = - 1 & & ( qs . counter + + % qs . freq = = 0 ) ;
logger_log ( & s . logger , ( uint8_t * ) msg - > getData ( ) , msg - > getSize ( ) , in_qlog ) ;
bytes_count + = msg - > getSize ( ) ;
if ( ( + + msg_count % 1000 ) = = 0 ) {
double ts = seconds_since_boot ( ) ;
LOGD ( " %lu messages, %.2f msg/sec, %.2f KB/sec " , msg_count , msg_count * 1.0 / ( ts - start_ts ) , bytes_count * 0.001 / ( ts - start_ts ) ) ;
}
}
delete msg ;
if ( last_msg ) {
int fpkt_id = - 1 ;
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) {
if ( sock = = s . rotate_state [ cid ] . fpkt_sock ) {
fpkt_id = cid ;
break ;
}
}
if ( fpkt_id > = 0 ) {
// track camera frames to sync to encoder
// only process last frame
capnp : : FlatArrayMessageReader cmsg ( aligned_buf . align ( last_msg ) ) ;
cereal : : Event : : Reader event = cmsg . getRoot < cereal : : Event > ( ) ;
rotate_if_needed ( ) ;
if ( fpkt_id = = LOG_CAMERA_ID_FCAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getRoadCameraState ( ) . getFrameId ( ) ) ;
} else if ( fpkt_id = = LOG_CAMERA_ID_DCAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getDriverCameraState ( ) . getFrameId ( ) ) ;
} else if ( fpkt_id = = LOG_CAMERA_ID_ECAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getWideRoadCameraState ( ) . getFrameId ( ) ) ;
}
last_camera_seen_tms = millis_since_boot ( ) ;
}
}
delete last_msg ;
}
bool new_segment = s . logger . part = = - 1 ;
if ( s . logger . part > - 1 ) {
double tms = millis_since_boot ( ) ;
if ( tms - last_camera_seen_tms < = NO_CAMERA_PATIENCE & & encoder_threads . size ( ) > 0 ) {
new_segment = true ;
for ( auto & r : s . rotate_state ) {
// this *should* be redundant on tici since all camera frames are synced
new_segment & = ( ( ( r . stream_frame_id > = r . last_rotate_frame_id + SEGMENT_LENGTH * MAIN_FPS ) & &
( ! r . should_rotate ) & & ( r . initialized ) ) | |
( ! r . enabled ) ) ;
if ( ! Hardware : : TICI ( ) ) break ; // only look at fcamera frame id if not QCOM2
}
} else {
if ( tms - last_rotate_tms > SEGMENT_LENGTH * 1000 ) {
new_segment = true ;
LOGW ( " no camera packet seen. auto rotated " ) ;
}
if ( ( + + msg_count % 1000 ) = = 0 ) {
double seconds = ( millis_since_boot ( ) - start_ts ) / 1000.0 ;
LOGD ( " %lu messages, %.2f msg/sec, %.2f KB/sec " , msg_count , msg_count / seconds , bytes_count * 0.001 / seconds ) ;
}
}
// rotate to new segment
if ( new_segment ) {
pthread_mutex_lock ( & s . rotate_lock ) ;
last_rotate_tms = millis_since_boot ( ) ;
int err = logger_next ( & s . logger , LOG_ROOT . c_str ( ) , s . segment_path , sizeof ( s . segment_path ) , & s . rotate_segment ) ;
assert ( err = = 0 ) ;
LOGW ( ( s . logger . part = = 0 ) ? " logging to %s " : " rotated to %s " , s . segment_path ) ;
// rotate encoders
for ( auto & r : s . rotate_state ) r . rotate ( ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
}
}
LOGW ( " closing encoders " ) ;
for ( auto & r : s . rotate_state ) r . cancelWait ( ) ;
s . rotate_cv . notify_all ( ) ;
for ( auto & t : encoder_threads ) t . join ( ) ;
LOGW ( " closing logger " ) ;
@ -479,7 +361,7 @@ int main(int argc, char** argv) {
}
// messaging cleanup
for ( auto sock : sock s) delete sock ;
for ( auto & [ sock , qs ] : qlog_state s) delete sock ;
delete poller ;
delete s . ctx ;