You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							43 lines
						
					
					
						
							1.9 KiB
						
					
					
				
			
		
		
	
	
							43 lines
						
					
					
						
							1.9 KiB
						
					
					
				from collections import namedtuple
 | 
						|
from cereal.visionipc import VisionStreamType
 | 
						|
from common.realtime import DT_MDL, DT_DMON
 | 
						|
from common.transformations.camera import tici_f_frame_size, tici_d_frame_size, tici_e_frame_size, eon_f_frame_size, eon_d_frame_size
 | 
						|
 | 
						|
VideoStreamMeta = namedtuple("VideoStreamMeta", ["camera_state", "encode_index", "stream", "dt", "frame_sizes"])
 | 
						|
ROAD_CAMERA_FRAME_SIZES = {"tici": tici_f_frame_size, "tizi": tici_f_frame_size, "eon": eon_f_frame_size}
 | 
						|
WIDE_ROAD_CAMERA_FRAME_SIZES = {"tici": tici_e_frame_size, "tizi": tici_e_frame_size}
 | 
						|
DRIVER_FRAME_SIZES = {"tici": tici_d_frame_size, "tizi": tici_d_frame_size, "eon": eon_d_frame_size}
 | 
						|
VIPC_STREAM_METADATA = [
 | 
						|
  # metadata: (state_msg_type, encode_msg_type, stream_type, dt, frame_sizes)
 | 
						|
  ("roadCameraState", "roadEncodeIdx", VisionStreamType.VISION_STREAM_ROAD, DT_MDL, ROAD_CAMERA_FRAME_SIZES),
 | 
						|
  ("wideRoadCameraState", "wideRoadEncodeIdx", VisionStreamType.VISION_STREAM_WIDE_ROAD, DT_MDL, WIDE_ROAD_CAMERA_FRAME_SIZES),
 | 
						|
  ("driverCameraState", "driverEncodeIdx", VisionStreamType.VISION_STREAM_DRIVER, DT_DMON, DRIVER_FRAME_SIZES),
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def meta_from_camera_state(state):
 | 
						|
  meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[0] == state), None)
 | 
						|
  return meta
 | 
						|
 | 
						|
 | 
						|
def meta_from_encode_index(encode_index):
 | 
						|
  meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[1] == encode_index), None)
 | 
						|
  return meta
 | 
						|
 | 
						|
 | 
						|
def meta_from_stream_type(stream_type):
 | 
						|
  meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[2] == stream_type), None)
 | 
						|
  return meta
 | 
						|
 | 
						|
 | 
						|
def available_streams(lr=None):
 | 
						|
  if lr is None:
 | 
						|
    return [VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA]
 | 
						|
 | 
						|
  result = []
 | 
						|
  for meta in VIPC_STREAM_METADATA:
 | 
						|
    has_cam_state = next((True for m in lr if m.which() == meta[0]), False)
 | 
						|
    if has_cam_state:
 | 
						|
      result.append(VideoStreamMeta(*meta))
 | 
						|
 | 
						|
  return result
 | 
						|
 |