|  |  | @ -1,4 +1,5 @@ | 
			
		
	
		
		
			
				
					
					|  |  |  | from collections import namedtuple |  |  |  | from collections import namedtuple | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | import capnp | 
			
		
	
		
		
			
				
					
					|  |  |  | import pathlib |  |  |  | import pathlib | 
			
		
	
		
		
			
				
					
					|  |  |  | import shutil |  |  |  | import shutil | 
			
		
	
		
		
			
				
					
					|  |  |  | import sys |  |  |  | import sys | 
			
		
	
	
		
		
			
				
					|  |  | @ -9,38 +10,26 @@ import os | 
			
		
	
		
		
			
				
					
					|  |  |  | import pywinctl |  |  |  | import pywinctl | 
			
		
	
		
		
			
				
					
					|  |  |  | import time |  |  |  | import time | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | from cereal import messaging, car, log |  |  |  | from cereal import messaging, log | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | from msgq.visionipc import VisionIpcServer, VisionStreamType |  |  |  | from msgq.visionipc import VisionIpcServer, VisionStreamType | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | from cereal.messaging import SubMaster, PubMaster |  |  |  | from cereal.messaging import SubMaster, PubMaster | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.common.params import Params |  |  |  | from openpilot.common.params import Params | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.common.realtime import DT_MDL |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.common.transformations.camera import DEVICE_CAMERAS |  |  |  | from openpilot.common.transformations.camera import DEVICE_CAMERAS | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.test.helpers import with_processes |  |  |  | from openpilot.selfdrive.test.helpers import with_processes | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state |  |  |  | from openpilot.tools.lib.logreader import LogReader | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | UI_DELAY = 0.5 # may be slower on CI? |  |  |  | UI_DELAY = 0.5 # may be slower on CI? | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | TEST_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | NetworkType = log.DeviceState.NetworkType |  |  |  | CAM = DEVICE_CAMERAS[("tici", "ar0231")] | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | NetworkStrength = log.DeviceState.NetworkStrength |  |  |  | DATA: dict[str, capnp.lib.capnp._DynamicStructBuilder | None] = dict.fromkeys( | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |   ["deviceState", "pandaStates", "controlsState", "liveCalibration", | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | EventName = car.CarEvent.EventName |  |  |  |   "modelV2", "radarState", "driverMonitoringState", | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | EVENTS_BY_NAME = {v: k for k, v in EventName.schema.enumerants.items()} |  |  |  |   "carState", "driverStateV2", "roadCameraState", "wideRoadCameraState"], None) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def setup_common(click, pm: PubMaster): |  |  |  | def setup_common(click, pm: PubMaster): | 
			
		
	
		
		
			
				
					
					|  |  |  |   Params().put("DongleId", "123456789012345") |  |  |  |   Params().put("DongleId", "123456789012345") | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat = messaging.new_message('deviceState') |  |  |  |   pm.send('deviceState', DATA['deviceState']) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.started = True |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.networkType = NetworkType.cell4G |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.networkStrength = NetworkStrength.moderate |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.freeSpacePercent = 80 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.memoryUsagePercent = 2 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.cpuTempC = [2,]*3 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.gpuTempC = [2,]*3 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.deviceState.cpuUsagePercent = [2,]*8 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   pm.send("deviceState", dat) |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def setup_homescreen(click, pm: PubMaster): |  |  |  | def setup_homescreen(click, pm: PubMaster): | 
			
		
	
		
		
			
				
					
					|  |  |  |   setup_common(click, pm) |  |  |  |   setup_common(click, pm) | 
			
		
	
	
		
		
			
				
					|  |  | @ -59,37 +48,30 @@ def setup_settings_network(click, pm: PubMaster): | 
			
		
	
		
		
			
				
					
					|  |  |  | def setup_onroad(click, pm: PubMaster): |  |  |  | def setup_onroad(click, pm: PubMaster): | 
			
		
	
		
		
			
				
					
					|  |  |  |   setup_common(click, pm) |  |  |  |   setup_common(click, pm) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat = messaging.new_message('pandaStates', 1) |  |  |  |   vipc_server = VisionIpcServer("camerad") | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   dat.pandaStates[0].ignitionLine = True |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   dat.pandaStates[0].pandaType = log.PandaState.PandaType.uno |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   pm.send("pandaStates", dat) |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   d = DEVICE_CAMERAS[("tici", "ar0231")] |  |  |  |   streams = [(VisionStreamType.VISION_STREAM_ROAD, CAM.fcam), | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   server = VisionIpcServer("camerad") |  |  |  |              (VisionStreamType.VISION_STREAM_DRIVER, CAM.dcam), | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   server.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 40, False, d.fcam.width, d.fcam.height) |  |  |  |              (VisionStreamType.VISION_STREAM_WIDE_ROAD, CAM.ecam)] | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, d.dcam.width, d.dcam.height) |  |  |  |   for stream_type, cam in streams: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   server.create_buffers(VisionStreamType.VISION_STREAM_WIDE_ROAD, 40, False, d.fcam.width, d.fcam.height) |  |  |  |     vipc_server.create_buffers(stream_type, 40, False, cam.width, cam.height) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   server.start_listener() |  |  |  |  | 
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   time.sleep(0.5) # give time for vipc server to start |  |  |  |   vipc_server.start_listener() | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   IMG = np.zeros((int(d.fcam.width*1.5), d.fcam.height), dtype=np.uint8) |  |  |  |   packet_id = 0 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   IMG_BYTES = IMG.flatten().tobytes() |  |  |  |   for _ in range(10): | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     for service, data in DATA.items(): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       if data: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         data.clear_write_flag() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         pm.send(service, data) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   cams = ('roadCameraState', 'wideRoadCameraState') |  |  |  |     for stream_type, cam in streams: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       IMG = np.zeros((int(cam.width*1.5), cam.height), dtype=np.uint8) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       IMG_BYTES = IMG.flatten().tobytes() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       packet_id = packet_id + 1 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       vipc_server.send(stream_type, IMG_BYTES, packet_id, packet_id, packet_id) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   frame_id = 0 |  |  |  |     time.sleep(0.05) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   for cam in cams: |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     msg = messaging.new_message(cam) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     cs = getattr(msg, cam) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     cs.frameId = frame_id |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     cs.timestampSof = int((frame_id * DT_MDL) * 1e9) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     cs.timestampEof = int((frame_id * DT_MDL) * 1e9) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     cam_meta = meta_from_camera_state(cam) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     pm.send(msg.which(), msg) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     server.send(cam_meta.stream, IMG_BYTES, cs.frameId, cs.timestampSof, cs.timestampEof) |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def setup_onroad_sidebar(click, pm: PubMaster): |  |  |  | def setup_onroad_sidebar(click, pm: PubMaster): | 
			
		
	
		
		
			
				
					
					|  |  |  |   setup_onroad(click, pm) |  |  |  |   setup_onroad(click, pm) | 
			
		
	
	
		
		
			
				
					|  |  | @ -141,7 +123,7 @@ class TestUI: | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def setup(self): |  |  |  |   def setup(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.sm = SubMaster(["uiDebug"]) |  |  |  |     self.sm = SubMaster(["uiDebug"]) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.pm = PubMaster(["deviceState", "pandaStates", "controlsState", 'roadCameraState', 'wideRoadCameraState']) |  |  |  |     self.pm = PubMaster(list(DATA.keys())) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     while not self.sm.valid["uiDebug"]: |  |  |  |     while not self.sm.valid["uiDebug"]: | 
			
		
	
		
		
			
				
					
					|  |  |  |       self.sm.update(1) |  |  |  |       self.sm.update(1) | 
			
		
	
		
		
			
				
					
					|  |  |  |     time.sleep(UI_DELAY) # wait a bit more for the UI to start rendering |  |  |  |     time.sleep(UI_DELAY) # wait a bit more for the UI to start rendering | 
			
		
	
	
		
		
			
				
					|  |  | @ -195,6 +177,14 @@ def create_screenshots(): | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   SCREENSHOTS_DIR.mkdir(parents=True) |  |  |  |   SCREENSHOTS_DIR.mkdir(parents=True) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   lr = list(LogReader(f'{TEST_ROUTE}/1/q')) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   for event in lr: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     if event.which() in DATA: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       DATA[event.which()] = event.as_builder() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     if all(DATA.values()): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       break | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   t = TestUI() |  |  |  |   t = TestUI() | 
			
		
	
		
		
			
				
					
					|  |  |  |   for name, setup in CASES.items(): |  |  |  |   for name, setup in CASES.items(): | 
			
		
	
		
		
			
				
					
					|  |  |  |     t.test_ui(name, setup) |  |  |  |     t.test_ui(name, setup) | 
			
		
	
	
		
		
			
				
					|  |  | 
 |