|  |  |  | @ -21,39 +21,40 @@ CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState') | 
			
		
	
		
			
				
					|  |  |  |  | @flaky(max_runs=3) | 
			
		
	
		
			
				
					|  |  |  |  | @pytest.mark.tici | 
			
		
	
		
			
				
					|  |  |  |  | class TestCamerad: | 
			
		
	
		
			
				
					|  |  |  |  |   def setup_method(self): | 
			
		
	
		
			
				
					|  |  |  |  |   @classmethod | 
			
		
	
		
			
				
					|  |  |  |  |   def setup_class(cls): | 
			
		
	
		
			
				
					|  |  |  |  |     # run camerad and record logs | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes['camerad'].start() | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(3) | 
			
		
	
		
			
				
					|  |  |  |  |     socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS} | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     self.logs = defaultdict(list) | 
			
		
	
		
			
				
					|  |  |  |  |     cls.logs = defaultdict(list) | 
			
		
	
		
			
				
					|  |  |  |  |     start_time = time.monotonic() | 
			
		
	
		
			
				
					|  |  |  |  |     while time.monotonic()- start_time < TEST_TIMESPAN: | 
			
		
	
		
			
				
					|  |  |  |  |       for cam, s in socks.items(): | 
			
		
	
		
			
				
					|  |  |  |  |         self.logs[cam] += messaging.drain_sock(s) | 
			
		
	
		
			
				
					|  |  |  |  |         cls.logs[cam] += messaging.drain_sock(s) | 
			
		
	
		
			
				
					|  |  |  |  |       time.sleep(0.2) | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes['camerad'].stop() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     self.log_by_frame_id = defaultdict(list) | 
			
		
	
		
			
				
					|  |  |  |  |     self.sensor_type = None | 
			
		
	
		
			
				
					|  |  |  |  |     for cam, msgs in self.logs.items(): | 
			
		
	
		
			
				
					|  |  |  |  |       if self.sensor_type is None: | 
			
		
	
		
			
				
					|  |  |  |  |         self.sensor_type = getattr(msgs[0], msgs[0].which()).sensor.raw | 
			
		
	
		
			
				
					|  |  |  |  |     cls.log_by_frame_id = defaultdict(list) | 
			
		
	
		
			
				
					|  |  |  |  |     cls.sensor_type = None | 
			
		
	
		
			
				
					|  |  |  |  |     for cam, msgs in cls.logs.items(): | 
			
		
	
		
			
				
					|  |  |  |  |       if cls.sensor_type is None: | 
			
		
	
		
			
				
					|  |  |  |  |         cls.sensor_type = getattr(msgs[0], msgs[0].which()).sensor.raw | 
			
		
	
		
			
				
					|  |  |  |  |       expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN | 
			
		
	
		
			
				
					|  |  |  |  |       assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       dts = np.abs(np.diff([getattr(m, m.which()).timestampSof/1e6 for m in msgs]) - 1000/SERVICE_LIST[cam].frequency) | 
			
		
	
		
			
				
					|  |  |  |  |       assert (dts < FRAME_DELTA_TOLERANCE[self.sensor_type]).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}" | 
			
		
	
		
			
				
					|  |  |  |  |       assert (dts < FRAME_DELTA_TOLERANCE[cls.sensor_type]).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       for m in msgs: | 
			
		
	
		
			
				
					|  |  |  |  |         self.log_by_frame_id[getattr(m, m.which()).frameId].append(m) | 
			
		
	
		
			
				
					|  |  |  |  |         cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # strip beginning and end | 
			
		
	
		
			
				
					|  |  |  |  |     for _ in range(3): | 
			
		
	
		
			
				
					|  |  |  |  |       mn, mx = min(self.log_by_frame_id.keys()), max(self.log_by_frame_id.keys()) | 
			
		
	
		
			
				
					|  |  |  |  |       del self.log_by_frame_id[mn] | 
			
		
	
		
			
				
					|  |  |  |  |       del self.log_by_frame_id[mx] | 
			
		
	
		
			
				
					|  |  |  |  |       mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys()) | 
			
		
	
		
			
				
					|  |  |  |  |       del cls.log_by_frame_id[mn] | 
			
		
	
		
			
				
					|  |  |  |  |       del cls.log_by_frame_id[mx] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_frame_skips(self): | 
			
		
	
		
			
				
					|  |  |  |  |     skips = {} | 
			
		
	
	
		
			
				
					|  |  |  | 
 |