|  |  |  | @ -14,15 +14,17 @@ from cereal import messaging, log | 
			
		
	
		
			
				
					|  |  |  |  | from msgq.visionipc import VisionIpcServer, VisionStreamType | 
			
		
	
		
			
				
					|  |  |  |  | from cereal.messaging import SubMaster, PubMaster | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.params import Params | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.transformations.camera import DEVICE_CAMERAS | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.transformations.camera import CameraConfig, DEVICE_CAMERAS | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.test.helpers import with_processes | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.tools.lib.logreader import LogReader | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.tools.lib.framereader import FrameReader | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.tools.lib.route import Route | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | UI_DELAY = 0.5 # may be slower on CI? | 
			
		
	
		
			
				
					|  |  |  |  | TEST_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | CAM = DEVICE_CAMERAS[("tici", "ar0231")] | 
			
		
	
		
			
				
					|  |  |  |  | DATA: dict[str, capnp.lib.capnp._DynamicStructBuilder | None] = dict.fromkeys( | 
			
		
	
		
			
				
					|  |  |  |  | STREAMS: list[tuple[VisionStreamType, CameraConfig, bytes]] = [] | 
			
		
	
		
			
				
					|  |  |  |  | DATA: dict[str, capnp.lib.capnp._DynamicStructBuilder] = dict.fromkeys( | 
			
		
	
		
			
				
					|  |  |  |  |   ["deviceState", "pandaStates", "controlsState", "liveCalibration", | 
			
		
	
		
			
				
					|  |  |  |  |   "modelV2", "radarState", "driverMonitoringState", | 
			
		
	
		
			
				
					|  |  |  |  |   "carState", "driverStateV2", "roadCameraState", "wideRoadCameraState"], None) | 
			
		
	
	
		
			
				
					|  |  |  | @ -50,33 +52,36 @@ def setup_onroad(click, pm: PubMaster): | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   vipc_server = VisionIpcServer("camerad") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   streams = [(VisionStreamType.VISION_STREAM_ROAD, CAM.fcam), | 
			
		
	
		
			
				
					|  |  |  |  |              (VisionStreamType.VISION_STREAM_DRIVER, CAM.dcam), | 
			
		
	
		
			
				
					|  |  |  |  |              (VisionStreamType.VISION_STREAM_WIDE_ROAD, CAM.ecam)] | 
			
		
	
		
			
				
					|  |  |  |  |   for stream_type, cam in streams: | 
			
		
	
		
			
				
					|  |  |  |  |     vipc_server.create_buffers(stream_type, 40, False, cam.width, cam.height) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   for stream_type, cam, _ in STREAMS: | 
			
		
	
		
			
				
					|  |  |  |  |     vipc_server.create_buffers(stream_type, 5, False, cam.width, cam.height) | 
			
		
	
		
			
				
					|  |  |  |  |   vipc_server.start_listener() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   packet_id = 0 | 
			
		
	
		
			
				
					|  |  |  |  |   for _ in range(10): | 
			
		
	
		
			
				
					|  |  |  |  |   for _ in range(30): | 
			
		
	
		
			
				
					|  |  |  |  |     for service, data in DATA.items(): | 
			
		
	
		
			
				
					|  |  |  |  |       if data: | 
			
		
	
		
			
				
					|  |  |  |  |         data.clear_write_flag() | 
			
		
	
		
			
				
					|  |  |  |  |         pm.send(service, data) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     for stream_type, cam in streams: | 
			
		
	
		
			
				
					|  |  |  |  |       IMG = np.zeros((int(cam.width*1.5), cam.height), dtype=np.uint8) | 
			
		
	
		
			
				
					|  |  |  |  |       IMG_BYTES = IMG.flatten().tobytes() | 
			
		
	
		
			
				
					|  |  |  |  |       packet_id = packet_id + 1 | 
			
		
	
		
			
				
					|  |  |  |  |       vipc_server.send(stream_type, IMG_BYTES, packet_id, packet_id, packet_id) | 
			
		
	
		
			
				
					|  |  |  |  |     packet_id = packet_id + 1 | 
			
		
	
		
			
				
					|  |  |  |  |     for stream_type, _, image in STREAMS: | 
			
		
	
		
			
				
					|  |  |  |  |       vipc_server.send(stream_type, image, packet_id, packet_id, packet_id) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(0.05) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def setup_onroad_wide(click, pm: PubMaster): | 
			
		
	
		
			
				
					|  |  |  |  |   DATA['controlsState'].controlsState.experimentalMode = True | 
			
		
	
		
			
				
					|  |  |  |  |   DATA["carState"].carState.vEgo = 1 | 
			
		
	
		
			
				
					|  |  |  |  |   setup_onroad(click, pm) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def setup_onroad_sidebar(click, pm: PubMaster): | 
			
		
	
		
			
				
					|  |  |  |  |   setup_onroad(click, pm) | 
			
		
	
		
			
				
					|  |  |  |  |   click(500, 500) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def setup_onroad_wide_sidebar(click, pm: PubMaster): | 
			
		
	
		
			
				
					|  |  |  |  |   setup_onroad_wide(click, pm) | 
			
		
	
		
			
				
					|  |  |  |  |   click(500, 500) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def setup_onroad_alert(click, pm: PubMaster, text1, text2, size, status=log.ControlsState.AlertStatus.normal): | 
			
		
	
		
			
				
					|  |  |  |  |   print(f'setup onroad alert, size: {size}') | 
			
		
	
	
		
			
				
					|  |  |  | @ -105,6 +110,8 @@ CASES = { | 
			
		
	
		
			
				
					|  |  |  |  |   "settings_network": setup_settings_network, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad": setup_onroad, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_sidebar": setup_onroad_sidebar, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_wide": setup_onroad_wide, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_wide_sidebar": setup_onroad_wide_sidebar, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_alert_small": setup_onroad_alert_small, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_alert_mid": setup_onroad_alert_mid, | 
			
		
	
		
			
				
					|  |  |  |  |   "onroad_alert_full": setup_onroad_alert_full, | 
			
		
	
	
		
			
				
					|  |  |  | @ -177,7 +184,10 @@ def create_screenshots(): | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   SCREENSHOTS_DIR.mkdir(parents=True) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   lr = list(LogReader(f'{TEST_ROUTE}/1/q')) | 
			
		
	
		
			
				
					|  |  |  |  |   route = Route(TEST_ROUTE) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   segnum = 2 | 
			
		
	
		
			
				
					|  |  |  |  |   lr = LogReader(route.qlog_paths()[segnum]) | 
			
		
	
		
			
				
					|  |  |  |  |   for event in lr: | 
			
		
	
		
			
				
					|  |  |  |  |     if event.which() in DATA: | 
			
		
	
		
			
				
					|  |  |  |  |       DATA[event.which()] = event.as_builder() | 
			
		
	
	
		
			
				
					|  |  |  | @ -185,6 +195,13 @@ def create_screenshots(): | 
			
		
	
		
			
				
					|  |  |  |  |     if all(DATA.values()): | 
			
		
	
		
			
				
					|  |  |  |  |       break | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   cam = DEVICE_CAMERAS[("tici", "ar0231")] | 
			
		
	
		
			
				
					|  |  |  |  |   road_img = FrameReader(route.camera_paths()[segnum]).get(0, pix_fmt="nv12")[0] | 
			
		
	
		
			
				
					|  |  |  |  |   STREAMS.append((VisionStreamType.VISION_STREAM_ROAD, cam.fcam, road_img.flatten().tobytes())) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   wide_road_img = FrameReader(route.ecamera_paths()[segnum]).get(0, pix_fmt="nv12")[0] | 
			
		
	
		
			
				
					|  |  |  |  |   STREAMS.append((VisionStreamType.VISION_STREAM_WIDE_ROAD, cam.ecam, wide_road_img.flatten().tobytes())) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   t = TestUI() | 
			
		
	
		
			
				
					|  |  |  |  |   for name, setup in CASES.items(): | 
			
		
	
		
			
				
					|  |  |  |  |     t.test_ui(name, setup) | 
			
		
	
	
		
			
				
					|  |  |  | 
 |