@ -13,8 +13,16 @@ from cereal.visionipc import VisionIpcServer, VisionStreamType
W , H = 1928 , 1208
W , H = 1928 , 1208
V4L2_BUF_FLAG_KEYFRAME = 8
V4L2_BUF_FLAG_KEYFRAME = 8
def decoder ( addr , sock_name , vipc_server , vst , nvidia ) :
ENCODE_SOCKETS = {
print ( " start decoder for %s " % sock_name )
VisionStreamType . VISION_STREAM_ROAD : " roadEncodeData " ,
VisionStreamType . VISION_STREAM_WIDE_ROAD : " wideRoadEncodeData " ,
VisionStreamType . VISION_STREAM_DRIVER : " driverEncodeData " ,
}
def decoder ( addr , vipc_server , vst , nvidia , debug = False ) :
sock_name = ENCODE_SOCKETS [ vst ]
if debug :
print ( " start decoder for %s " % sock_name )
if nvidia :
if nvidia :
os . environ [ " NV_LOW_LATENCY " ] = " 3 " # both bLowLatency and CUVID_PKT_ENDOFPICTURE
os . environ [ " NV_LOW_LATENCY " ] = " 3 " # both bLowLatency and CUVID_PKT_ENDOFPICTURE
sys . path + = os . environ [ " LD_LIBRARY_PATH " ] . split ( " : " )
sys . path + = os . environ [ " LD_LIBRARY_PATH " ] . split ( " : " )
@ -40,11 +48,12 @@ def decoder(addr, sock_name, vipc_server, vst, nvidia):
msgs = messaging . drain_sock ( sock , wait_for_one = True )
msgs = messaging . drain_sock ( sock , wait_for_one = True )
for evt in msgs :
for evt in msgs :
evta = getattr ( evt , evt . which ( ) )
evta = getattr ( evt , evt . which ( ) )
if evta . idx . encodeId != 0 and evta . idx . encodeId != ( last_idx + 1 ) :
if debug and evta . idx . encodeId != 0 and evta . idx . encodeId != ( last_idx + 1 ) :
print ( " DROP PACKET! " )
print ( " DROP PACKET! " )
last_idx = evta . idx . encodeId
last_idx = evta . idx . encodeId
if not seen_iframe and not ( evta . idx . flags & V4L2_BUF_FLAG_KEYFRAME ) :
if not seen_iframe and not ( evta . idx . flags & V4L2_BUF_FLAG_KEYFRAME ) :
print ( " waiting for iframe " )
if debug :
print ( " waiting for iframe " )
continue
continue
time_q . append ( time . monotonic ( ) )
time_q . append ( time . monotonic ( ) )
network_latency = ( int ( time . time ( ) * 1e9 ) - evta . unixTimestampNanos ) / 1e6
network_latency = ( int ( time . time ( ) * 1e9 ) - evta . unixTimestampNanos ) / 1e6
@ -62,14 +71,16 @@ def decoder(addr, sock_name, vipc_server, vst, nvidia):
if nvidia :
if nvidia :
rawSurface = nvDec . DecodeSurfaceFromPacket ( np . frombuffer ( evta . data , dtype = np . uint8 ) )
rawSurface = nvDec . DecodeSurfaceFromPacket ( np . frombuffer ( evta . data , dtype = np . uint8 ) )
if rawSurface . Empty ( ) :
if rawSurface . Empty ( ) :
print ( " DROP SURFACE " )
if debug :
print ( " DROP SURFACE " )
continue
continue
convSurface = conv_yuv . Execute ( rawSurface , cc1 )
convSurface = conv_yuv . Execute ( rawSurface , cc1 )
nvDwn_yuv . DownloadSingleSurface ( convSurface , img_yuv )
nvDwn_yuv . DownloadSingleSurface ( convSurface , img_yuv )
else :
else :
frames = codec . decode ( av . packet . Packet ( evta . data ) )
frames = codec . decode ( av . packet . Packet ( evta . data ) )
if len ( frames ) == 0 :
if len ( frames ) == 0 :
print ( " DROP SURFACE " )
if debug :
print ( " DROP SURFACE " )
continue
continue
assert len ( frames ) == 1
assert len ( frames ) == 1
img_yuv = frames [ 0 ] . to_ndarray ( format = av . video . format . VideoFormat ( ' yuv420p ' ) ) . flatten ( )
img_yuv = frames [ 0 ] . to_ndarray ( format = av . video . format . VideoFormat ( ' yuv420p ' ) ) . flatten ( )
@ -83,34 +94,45 @@ def decoder(addr, sock_name, vipc_server, vst, nvidia):
pc_latency = ( time . monotonic ( ) - time_q [ 0 ] ) * 1000
pc_latency = ( time . monotonic ( ) - time_q [ 0 ] ) * 1000
time_q = time_q [ 1 : ]
time_q = time_q [ 1 : ]
print ( " %2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms " % ( len ( msgs ) , evta . idx . encodeId , evt . logMonoTime / 1e9 , evta . idx . timestampEof / 1e6 , frame_latency , process_latency , network_latency , pc_latency , process_latency + network_latency + pc_latency ) , len ( evta . data ) , sock_name )
if debug :
print ( " %2d %4d %.3f %.3f roll %6.2f ms latency %6.2f ms + %6.2f ms + %6.2f ms = %6.2f ms " % ( len ( msgs ) , evta . idx . encodeId , evt . logMonoTime / 1e9 , evta . idx . timestampEof / 1e6 , frame_latency , process_latency , network_latency , pc_latency , process_latency + network_latency + pc_latency ) , len ( evta . data ) , sock_name )
class CompressedVipc :
def __init__ ( self , addr , vision_streams , nvidia = False , debug = False ) :
self . vipc_server = VisionIpcServer ( " camerad " )
for vst in vision_streams :
self . vipc_server . create_buffers ( vst , 4 , False , W , H )
self . vipc_server . start_listener ( )
def main ( addr , cams , nvidia = False ) :
self . procs = [ ]
vipc_server = VisionIpcServer ( " camerad " )
for vst in vision_streams :
for vst in cams . values ( ) :
p = multiprocessing . Process ( target = decoder , args = ( addr , self . vipc_server , vst , nvidia , debug ) )
vipc_server . create_buffers ( vst , 4 , False , W , H )
p . start ( )
vipc_server . start_listener ( )
self . procs . append ( p )
procs = [ ]
def join ( self ) :
for k , v in cams . items ( ) :
for p in self . procs :
p = multiprocessing . Process ( target = decoder , args = ( addr , k , vipc_server , v , nvidia ) )
p . join ( )
p . start ( )
procs . append ( p )
for p in procs :
def kill ( self ) :
p . join ( )
for p in self . procs :
p . terminate ( )
self . join ( )
if __name__ == " __main__ " :
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " Decode video streams and broadcast on VisionIPC " )
parser = argparse . ArgumentParser ( description = " Decode video streams and broadcast on VisionIPC " )
parser . add_argument ( " addr " , help = " Address of comma three " )
parser . add_argument ( " addr " , help = " Address of comma three " )
parser . add_argument ( " --nvidia " , action = " store_true " , help = " Use nvidia instead of ffmpeg " )
parser . add_argument ( " --nvidia " , action = " store_true " , help = " Use nvidia instead of ffmpeg " )
parser . add_argument ( " --cams " , default = " 0,1,2 " , help = " Cameras to decode " )
parser . add_argument ( " --cams " , default = " 0,1,2 " , help = " Cameras to decode " )
parser . add_argument ( " --silent " , action = " store_true " , help = " Suppress debug output " )
args = parser . parse_args ( )
args = parser . parse_args ( )
all_c ams = [
vision_stre ams = [
( " roadEncodeData " , VisionStreamType . VISION_STREAM_ROAD ) ,
VisionStreamType . VISION_STREAM_ROAD ,
( " wideRoadEncodeData " , VisionStreamType . VISION_STREAM_WIDE_ROAD ) ,
VisionStreamType . VISION_STREAM_WIDE_ROAD ,
( " driverEncodeData " , VisionStreamType . VISION_STREAM_DRIVER ) ,
VisionStreamType . VISION_STREAM_DRIVER ,
]
]
cams = dict ( [ all_cams [ int ( x ) ] for x in args . cams . split ( " , " ) ] )
main ( args . addr , cams , args . nvidia )
vsts = [ vision_streams [ int ( x ) ] for x in args . cams . split ( " , " ) ]
cvipc = CompressedVipc ( args . addr , vsts , args . nvidia , debug = ( not args . silent ) )
cvipc . join ( )