|  |  | @ -121,83 +121,83 @@ class TestCarModelBase(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") |  |  |  |     self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.safety.init_tests() |  |  |  |     self.safety.init_tests() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   # def test_car_params(self): |  |  |  |   def test_car_params(self): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   if self.CP.dashcamOnly: |  |  |  |     if self.CP.dashcamOnly: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     self.skipTest("no need to check carParams for dashcamOnly") |  |  |  |       self.skipTest("no need to check carParams for dashcamOnly") | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   # make sure car params are within a valid range |  |  |  |     # make sure car params are within a valid range | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   self.assertGreater(self.CP.mass, 1) |  |  |  |     self.assertGreater(self.CP.mass, 1) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   if self.CP.steerControlType != car.CarParams.SteerControlType.angle: |  |  |  |     if self.CP.steerControlType != car.CarParams.SteerControlType.angle: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     tuning = self.CP.lateralTuning.which() |  |  |  |       tuning = self.CP.lateralTuning.which() | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     if tuning == 'pid': |  |  |  |       if tuning == 'pid': | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       self.assertTrue(len(self.CP.lateralTuning.pid.kpV)) |  |  |  |         self.assertTrue(len(self.CP.lateralTuning.pid.kpV)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     elif tuning == 'torque': |  |  |  |       elif tuning == 'torque': | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       self.assertTrue(self.CP.lateralTuning.torque.kf > 0) |  |  |  |         self.assertTrue(self.CP.lateralTuning.torque.kf > 0) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     elif tuning == 'indi': |  |  |  |       elif tuning == 'indi': | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV)) |  |  |  |         self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     else: |  |  |  |       else: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       raise Exception("unknown tuning") |  |  |  |         raise Exception("unknown tuning") | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # def test_car_interface(self): |  |  |  |   def test_car_interface(self): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   # TODO: also check for checksum violations from can parser |  |  |  |     # TODO: also check for checksum violations from can parser | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   can_invalid_cnt = 0 |  |  |  |     can_invalid_cnt = 0 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   can_valid = False |  |  |  |     can_valid = False | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   CC = car.CarControl.new_message() |  |  |  |     CC = car.CarControl.new_message() | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   for i, msg in enumerate(self.can_msgs): |  |  |  |     for i, msg in enumerate(self.can_msgs): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     CS = self.CI.update(CC, (msg.as_builder().to_bytes(),)) |  |  |  |       CS = self.CI.update(CC, (msg.as_builder().to_bytes(),)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     self.CI.apply(CC) |  |  |  |       self.CI.apply(CC) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     if CS.canValid: |  |  |  |       if CS.canValid: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       can_valid = True |  |  |  |         can_valid = True | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     # wait max of 2s for low frequency msgs to be seen |  |  |  |       # wait max of 2s for low frequency msgs to be seen | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     if i > 200 or can_valid: |  |  |  |       if i > 200 or can_valid: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       can_invalid_cnt += not CS.canValid |  |  |  |         can_invalid_cnt += not CS.canValid | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   self.assertEqual(can_invalid_cnt, 0) |  |  |  |     self.assertEqual(can_invalid_cnt, 0) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # def test_radar_interface(self): |  |  |  |   def test_radar_interface(self): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   os.environ['NO_RADAR_SLEEP'] = "1" |  |  |  |     os.environ['NO_RADAR_SLEEP'] = "1" | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface |  |  |  |     RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   RI = RadarInterface(self.CP) |  |  |  |     RI = RadarInterface(self.CP) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   assert RI |  |  |  |     assert RI | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   error_cnt = 0 |  |  |  |     error_cnt = 0 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   for i, msg in enumerate(self.can_msgs): |  |  |  |     for i, msg in enumerate(self.can_msgs): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     rr = RI.update((msg.as_builder().to_bytes(),)) |  |  |  |       rr = RI.update((msg.as_builder().to_bytes(),)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     if rr is not None and i > 50: |  |  |  |       if rr is not None and i > 50: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       error_cnt += car.RadarData.Error.canError in rr.errors |  |  |  |         error_cnt += car.RadarData.Error.canError in rr.errors | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   self.assertEqual(error_cnt, 0) |  |  |  |     self.assertEqual(error_cnt, 0) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # def test_panda_safety_rx_valid(self): |  |  |  |   def test_panda_safety_rx_valid(self): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   if self.CP.dashcamOnly: |  |  |  |     if self.CP.dashcamOnly: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     self.skipTest("no need to check panda safety for dashcamOnly") |  |  |  |       self.skipTest("no need to check panda safety for dashcamOnly") | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   start_ts = self.can_msgs[0].logMonoTime |  |  |  |     start_ts = self.can_msgs[0].logMonoTime | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   failed_addrs = Counter() |  |  |  |     failed_addrs = Counter() | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   for can in self.can_msgs: |  |  |  |     for can in self.can_msgs: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     # update panda timer |  |  |  |       # update panda timer | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     t = (can.logMonoTime - start_ts) / 1e3 |  |  |  |       t = (can.logMonoTime - start_ts) / 1e3 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     self.safety.set_timer(int(t)) |  |  |  |       self.safety.set_timer(int(t)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     # run all msgs through the safety RX hook |  |  |  |       # run all msgs through the safety RX hook | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     for msg in can.can: |  |  |  |       for msg in can.can: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       if msg.src >= 64: |  |  |  |         if msg.src >= 64: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #         continue |  |  |  |           continue | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       to_send = package_can_msg([msg.address, 0, msg.dat, msg.src % 4]) |  |  |  |         to_send = package_can_msg([msg.address, 0, msg.dat, msg.src % 4]) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       if self.safety.safety_rx_hook(to_send) != 1: |  |  |  |         if self.safety.safety_rx_hook(to_send) != 1: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #         failed_addrs[hex(msg.address)] += 1 |  |  |  |           failed_addrs[hex(msg.address)] += 1 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   # |  |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     # ensure all msgs defined in the addr checks are valid |  |  |  |       # ensure all msgs defined in the addr checks are valid | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #     if self.car_model not in ignore_addr_checks_valid: |  |  |  |       if self.car_model not in ignore_addr_checks_valid: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       self.safety.safety_tick_current_rx_checks() |  |  |  |         self.safety.safety_tick_current_rx_checks() | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #       if t > 1e6: |  |  |  |         if t > 1e6: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #         self.assertTrue(self.safety.addr_checks_valid()) |  |  |  |           self.assertTrue(self.safety.addr_checks_valid()) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   #   self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") |  |  |  |     self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_panda_safety_carstate(self): |  |  |  |   def test_panda_safety_carstate(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     """ |  |  |  |     """ | 
			
		
	
	
		
		
			
				
					|  |  | 
 |