from  collections  import  namedtuple 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  msgq . visionipc  import  VisionStreamType 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . realtime  import  DT_MDL ,  DT_DMON 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . transformations . camera  import  DEVICE_CAMERAS 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								VideoStreamMeta  =  namedtuple ( " VideoStreamMeta " ,  [ " camera_state " ,  " encode_index " ,  " stream " ,  " dt " ,  " frame_sizes " ] ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								ROAD_CAMERA_FRAME_SIZES  =  { k :  ( v . dcam . width ,  v . dcam . height )  for  k ,  v  in  DEVICE_CAMERAS . items ( ) } 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								WIDE_ROAD_CAMERA_FRAME_SIZES  =  { k :  ( v . ecam . width ,  v . ecam . height )  for  k ,  v  in  DEVICE_CAMERAS . items ( )  if  v . ecam  is  not  None } 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								DRIVER_CAMERA_FRAME_SIZES  =  { k :  ( v . dcam . width ,  v . dcam . height )  for  k ,  v  in  DEVICE_CAMERAS . items ( ) } 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								VIPC_STREAM_METADATA  =  [ 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # metadata: (state_msg_type, encode_msg_type, stream_type, dt, frame_sizes) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " roadCameraState " ,  " roadEncodeIdx " ,  VisionStreamType . VISION_STREAM_ROAD ,  DT_MDL ,  ROAD_CAMERA_FRAME_SIZES ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " wideRoadCameraState " ,  " wideRoadEncodeIdx " ,  VisionStreamType . VISION_STREAM_WIDE_ROAD ,  DT_MDL ,  WIDE_ROAD_CAMERA_FRAME_SIZES ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  ( " driverCameraState " ,  " driverEncodeIdx " ,  VisionStreamType . VISION_STREAM_DRIVER ,  DT_DMON ,  DRIVER_CAMERA_FRAME_SIZES ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  meta_from_camera_state ( state ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  meta  =  next ( ( VideoStreamMeta ( * meta )  for  meta  in  VIPC_STREAM_METADATA  if  meta [ 0 ]  ==  state ) ,  None ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  meta 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  meta_from_encode_index ( encode_index ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  meta  =  next ( ( VideoStreamMeta ( * meta )  for  meta  in  VIPC_STREAM_METADATA  if  meta [ 1 ]  ==  encode_index ) ,  None ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  meta 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  meta_from_stream_type ( stream_type ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  meta  =  next ( ( VideoStreamMeta ( * meta )  for  meta  in  VIPC_STREAM_METADATA  if  meta [ 2 ]  ==  stream_type ) ,  None ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  meta 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  available_streams ( lr = None ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  lr  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  [ VideoStreamMeta ( * meta )  for  meta  in  VIPC_STREAM_METADATA ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  result  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  meta  in  VIPC_STREAM_METADATA : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    has_cam_state  =  next ( ( True  for  m  in  lr  if  m . which ( )  ==  meta [ 0 ] ) ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  has_cam_state : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      result . append ( VideoStreamMeta ( * meta ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  result