|  |  |  | @ -180,6 +180,132 @@ class TestCarModelBase(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") | 
			
		
	
		
			
				
					|  |  |  |  |     self.safety.init_tests() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_car_params(self): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     if self.CP.dashcamOnly: | 
			
		
	
		
			
				
					|  |  |  |  |       self.skipTest("no need to check carParams for dashcamOnly") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # make sure car params are within a valid range | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertGreater(self.CP.mass, 1) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     if self.CP.steerControlType != car.CarParams.SteerControlType.angle: | 
			
		
	
		
			
				
					|  |  |  |  |       tuning = self.CP.lateralTuning.which() | 
			
		
	
		
			
				
					|  |  |  |  |       if tuning == 'pid': | 
			
		
	
		
			
				
					|  |  |  |  |         self.assertTrue(len(self.CP.lateralTuning.pid.kpV)) | 
			
		
	
		
			
				
					|  |  |  |  |       elif tuning == 'torque': | 
			
		
	
		
			
				
					|  |  |  |  |         self.assertTrue(self.CP.lateralTuning.torque.kf > 0) | 
			
		
	
		
			
				
					|  |  |  |  |       else: | 
			
		
	
		
			
				
					|  |  |  |  |         raise Exception("unknown tuning") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_car_interface(self): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     # TODO: also check for checksum violations from can parser | 
			
		
	
		
			
				
					|  |  |  |  |     can_invalid_cnt = 0 | 
			
		
	
		
			
				
					|  |  |  |  |     can_valid = False | 
			
		
	
		
			
				
					|  |  |  |  |     CC = car.CarControl.new_message() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     for i, msg in enumerate(self.can_msgs): | 
			
		
	
		
			
				
					|  |  |  |  |       CS = self.CI.update(CC, (msg.as_builder().to_bytes(),)) | 
			
		
	
		
			
				
					|  |  |  |  |       self.CI.apply(CC, msg.logMonoTime) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       if CS.canValid: | 
			
		
	
		
			
				
					|  |  |  |  |         can_valid = True | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # wait max of 2s for low frequency msgs to be seen | 
			
		
	
		
			
				
					|  |  |  |  |       if i > 200 or can_valid: | 
			
		
	
		
			
				
					|  |  |  |  |         can_invalid_cnt += not CS.canValid | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertEqual(can_invalid_cnt, 0) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_radar_interface(self): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     os.environ['NO_RADAR_SLEEP'] = "1" | 
			
		
	
		
			
				
					|  |  |  |  |     RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface | 
			
		
	
		
			
				
					|  |  |  |  |     RI = RadarInterface(self.CP) | 
			
		
	
		
			
				
					|  |  |  |  |     assert RI | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting, | 
			
		
	
		
			
				
					|  |  |  |  |     # start parsing CAN messages after we've left ELM mode and can expect CAN traffic | 
			
		
	
		
			
				
					|  |  |  |  |     error_cnt = 0 | 
			
		
	
		
			
				
					|  |  |  |  |     for i, msg in enumerate(self.can_msgs[self.elm_frame:]): | 
			
		
	
		
			
				
					|  |  |  |  |       rr = RI.update((msg.as_builder().to_bytes(),)) | 
			
		
	
		
			
				
					|  |  |  |  |       if rr is not None and i > 50: | 
			
		
	
		
			
				
					|  |  |  |  |         error_cnt += car.RadarData.Error.canError in rr.errors | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertEqual(error_cnt, 0) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_panda_safety_rx_valid(self): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     if self.CP.dashcamOnly: | 
			
		
	
		
			
				
					|  |  |  |  |       self.skipTest("no need to check panda safety for dashcamOnly") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     start_ts = self.can_msgs[0].logMonoTime | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     failed_addrs = Counter() | 
			
		
	
		
			
				
					|  |  |  |  |     for can in self.can_msgs: | 
			
		
	
		
			
				
					|  |  |  |  |       # update panda timer | 
			
		
	
		
			
				
					|  |  |  |  |       t = (can.logMonoTime - start_ts) / 1e3 | 
			
		
	
		
			
				
					|  |  |  |  |       self.safety.set_timer(int(t)) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # run all msgs through the safety RX hook | 
			
		
	
		
			
				
					|  |  |  |  |       for msg in can.can: | 
			
		
	
		
			
				
					|  |  |  |  |         if msg.src >= 64: | 
			
		
	
		
			
				
					|  |  |  |  |           continue | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) | 
			
		
	
		
			
				
					|  |  |  |  |         if self.safety.safety_rx_hook(to_send) != 1: | 
			
		
	
		
			
				
					|  |  |  |  |           failed_addrs[hex(msg.address)] += 1 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # ensure all msgs defined in the addr checks are valid | 
			
		
	
		
			
				
					|  |  |  |  |       self.safety.safety_tick_current_safety_config() | 
			
		
	
		
			
				
					|  |  |  |  |       if t > 1e6: | 
			
		
	
		
			
				
					|  |  |  |  |         self.assertTrue(self.safety.safety_config_valid()) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # Don't check relay malfunction on disabled routes (relay closed), | 
			
		
	
		
			
				
					|  |  |  |  |       # or before fingerprinting is done (elm327 and noOutput) | 
			
		
	
		
			
				
					|  |  |  |  |       if self.openpilot_enabled and t / 1e4 > self.car_safety_mode_frame: | 
			
		
	
		
			
				
					|  |  |  |  |         self.assertFalse(self.safety.get_relay_malfunction()) | 
			
		
	
		
			
				
					|  |  |  |  |       else: | 
			
		
	
		
			
				
					|  |  |  |  |         self.safety.set_relay_malfunction(False) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def test_panda_safety_tx_cases(self, data=None): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     """Asserts we can tx common messages""" | 
			
		
	
		
			
				
					|  |  |  |  |     if self.CP.notCar: | 
			
		
	
		
			
				
					|  |  |  |  |       self.skipTest("Skipping test for notCar") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     def test_car_controller(car_control): | 
			
		
	
		
			
				
					|  |  |  |  |       now_nanos = 0 | 
			
		
	
		
			
				
					|  |  |  |  |       msgs_sent = 0 | 
			
		
	
		
			
				
					|  |  |  |  |       CI = self.CarInterface(self.CP, self.CarController, self.CarState) | 
			
		
	
		
			
				
					|  |  |  |  |       for _ in range(round(10.0 / DT_CTRL)):  # make sure we hit the slowest messages | 
			
		
	
		
			
				
					|  |  |  |  |         CI.update(car_control, []) | 
			
		
	
		
			
				
					|  |  |  |  |         _, sendcan = CI.apply(car_control, now_nanos) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         now_nanos += DT_CTRL * 1e9 | 
			
		
	
		
			
				
					|  |  |  |  |         msgs_sent += len(sendcan) | 
			
		
	
		
			
				
					|  |  |  |  |         for addr, _, dat, bus in sendcan: | 
			
		
	
		
			
				
					|  |  |  |  |           to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat) | 
			
		
	
		
			
				
					|  |  |  |  |           self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # Make sure we attempted to send messages | 
			
		
	
		
			
				
					|  |  |  |  |       self.assertGreater(msgs_sent, 50) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # Make sure we can send all messages while inactive | 
			
		
	
		
			
				
					|  |  |  |  |     CC = car.CarControl.new_message() | 
			
		
	
		
			
				
					|  |  |  |  |     test_car_controller(CC) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # Test cancel + general messages (controls_allowed=False & cruise_engaged=True) | 
			
		
	
		
			
				
					|  |  |  |  |     self.safety.set_cruise_engaged_prev(True) | 
			
		
	
		
			
				
					|  |  |  |  |     CC = car.CarControl.new_message(cruiseControl={'cancel': True}) | 
			
		
	
		
			
				
					|  |  |  |  |     test_car_controller(CC) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # Test resume + general messages (controls_allowed=True & cruise_engaged=True) | 
			
		
	
		
			
				
					|  |  |  |  |     self.safety.set_controls_allowed(True) | 
			
		
	
		
			
				
					|  |  |  |  |     CC = car.CarControl.new_message(cruiseControl={'resume': True}) | 
			
		
	
		
			
				
					|  |  |  |  |     test_car_controller(CC) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   @settings(max_examples=100, deadline=None, | 
			
		
	
		
			
				
					|  |  |  |  |             phases=(Phase.reuse, Phase.generate,), | 
			
		
	
		
			
				
					|  |  |  |  |             suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow, HealthCheck.large_base_example], | 
			
		
	
	
		
			
				
					|  |  |  | @ -287,88 +413,89 @@ class TestCarModelBase(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     del msgs | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   # def test_panda_safety_carstate(self): | 
			
		
	
		
			
				
					|  |  |  |  |   #   """ | 
			
		
	
		
			
				
					|  |  |  |  |   #     Assert that panda safety matches openpilot's carState | 
			
		
	
		
			
				
					|  |  |  |  |   #   """ | 
			
		
	
		
			
				
					|  |  |  |  |   #   if self.CP.dashcamOnly: | 
			
		
	
		
			
				
					|  |  |  |  |   #     self.skipTest("no need to check panda safety for dashcamOnly") | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #   CC = car.CarControl.new_message() | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #   # warm up pass, as initial states may be different | 
			
		
	
		
			
				
					|  |  |  |  |   #   for can in self.can_msgs[:300]: | 
			
		
	
		
			
				
					|  |  |  |  |   #     self.CI.update(CC, (can.as_builder().to_bytes(), )) | 
			
		
	
		
			
				
					|  |  |  |  |   #     for msg in filter(lambda m: m.src in range(64), can.can): | 
			
		
	
		
			
				
					|  |  |  |  |   #       to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) | 
			
		
	
		
			
				
					|  |  |  |  |   #       self.safety.safety_rx_hook(to_send) | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #   controls_allowed_prev = False | 
			
		
	
		
			
				
					|  |  |  |  |   #   CS_prev = car.CarState.new_message() | 
			
		
	
		
			
				
					|  |  |  |  |   #   checks = defaultdict(lambda: 0) | 
			
		
	
		
			
				
					|  |  |  |  |   #   controlsd = Controls(CI=self.CI) | 
			
		
	
		
			
				
					|  |  |  |  |   #   controlsd.initialized = True | 
			
		
	
		
			
				
					|  |  |  |  |   #   for idx, can in enumerate(self.can_msgs): | 
			
		
	
		
			
				
					|  |  |  |  |   #     CS = self.CI.update(CC, (can.as_builder().to_bytes(), )) | 
			
		
	
		
			
				
					|  |  |  |  |   #     for msg in filter(lambda m: m.src in range(64), can.can): | 
			
		
	
		
			
				
					|  |  |  |  |   #       to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) | 
			
		
	
		
			
				
					|  |  |  |  |   #       ret = self.safety.safety_rx_hook(to_send) | 
			
		
	
		
			
				
					|  |  |  |  |   #       self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}") | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     # Skip first frame so CS_prev is properly initialized | 
			
		
	
		
			
				
					|  |  |  |  |   #     if idx == 0: | 
			
		
	
		
			
				
					|  |  |  |  |   #       CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  |   #       # Button may be left pressed in warm up period | 
			
		
	
		
			
				
					|  |  |  |  |   #       if not self.CP.pcmCruise: | 
			
		
	
		
			
				
					|  |  |  |  |   #         self.safety.set_controls_allowed(0) | 
			
		
	
		
			
				
					|  |  |  |  |   #       continue | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     # TODO: check rest of panda's carstate (steering, ACC main on, etc.) | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev() | 
			
		
	
		
			
				
					|  |  |  |  |   #     checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving() | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     # TODO: remove this exception once this mismatch is resolved | 
			
		
	
		
			
				
					|  |  |  |  |   #     brake_pressed = CS.brakePressed | 
			
		
	
		
			
				
					|  |  |  |  |   #     if CS.brakePressed and not self.safety.get_brake_pressed_prev(): | 
			
		
	
		
			
				
					|  |  |  |  |   #       if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05: | 
			
		
	
		
			
				
					|  |  |  |  |   #         brake_pressed = False | 
			
		
	
		
			
				
					|  |  |  |  |   #     checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev() | 
			
		
	
		
			
				
					|  |  |  |  |   #     checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev() | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     if self.CP.pcmCruise: | 
			
		
	
		
			
				
					|  |  |  |  |   #       # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state. | 
			
		
	
		
			
				
					|  |  |  |  |   #       # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but | 
			
		
	
		
			
				
					|  |  |  |  |   #       # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages). | 
			
		
	
		
			
				
					|  |  |  |  |   #       if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH: | 
			
		
	
		
			
				
					|  |  |  |  |   #         # only the rising edges are expected to match | 
			
		
	
		
			
				
					|  |  |  |  |   #         if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled: | 
			
		
	
		
			
				
					|  |  |  |  |   #           checks['controlsAllowed'] += not self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  |   #       else: | 
			
		
	
		
			
				
					|  |  |  |  |   #         checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #       # TODO: fix notCar mismatch | 
			
		
	
		
			
				
					|  |  |  |  |   #       if not self.CP.notCar: | 
			
		
	
		
			
				
					|  |  |  |  |   #         checks['cruiseState'] += CS.cruiseState.enabled != self.safety.get_cruise_engaged_prev() | 
			
		
	
		
			
				
					|  |  |  |  |   #     else: | 
			
		
	
		
			
				
					|  |  |  |  |   #       # Check for enable events on rising edge of controls allowed | 
			
		
	
		
			
				
					|  |  |  |  |   #       controlsd.update_events(CS) | 
			
		
	
		
			
				
					|  |  |  |  |   #       controlsd.CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  |   #       button_enable = (any(evt.enable for evt in CS.events) and | 
			
		
	
		
			
				
					|  |  |  |  |   #                        not any(evt == EventName.pedalPressed for evt in controlsd.events.names)) | 
			
		
	
		
			
				
					|  |  |  |  |   #       mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev) | 
			
		
	
		
			
				
					|  |  |  |  |   #       checks['controlsAllowed'] += mismatch | 
			
		
	
		
			
				
					|  |  |  |  |   #       controls_allowed_prev = self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  |   #       if button_enable and not mismatch: | 
			
		
	
		
			
				
					|  |  |  |  |   #         self.safety.set_controls_allowed(False) | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     if self.CP.carName == "honda": | 
			
		
	
		
			
				
					|  |  |  |  |   #       checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on() | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #     CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  |   # | 
			
		
	
		
			
				
					|  |  |  |  |   #   failed_checks = {k: v for k, v in checks.items() if v > 0} | 
			
		
	
		
			
				
					|  |  |  |  |   #   self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") | 
			
		
	
		
			
				
					|  |  |  |  |   def test_panda_safety_carstate(self): | 
			
		
	
		
			
				
					|  |  |  |  |     raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  |     """ | 
			
		
	
		
			
				
					|  |  |  |  |       Assert that panda safety matches openpilot's carState | 
			
		
	
		
			
				
					|  |  |  |  |     """ | 
			
		
	
		
			
				
					|  |  |  |  |     if self.CP.dashcamOnly: | 
			
		
	
		
			
				
					|  |  |  |  |       self.skipTest("no need to check panda safety for dashcamOnly") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     CC = car.CarControl.new_message() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # warm up pass, as initial states may be different | 
			
		
	
		
			
				
					|  |  |  |  |     for can in self.can_msgs[:300]: | 
			
		
	
		
			
				
					|  |  |  |  |       self.CI.update(CC, (can.as_builder().to_bytes(), )) | 
			
		
	
		
			
				
					|  |  |  |  |       for msg in filter(lambda m: m.src in range(64), can.can): | 
			
		
	
		
			
				
					|  |  |  |  |         to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) | 
			
		
	
		
			
				
					|  |  |  |  |         self.safety.safety_rx_hook(to_send) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     controls_allowed_prev = False | 
			
		
	
		
			
				
					|  |  |  |  |     CS_prev = car.CarState.new_message() | 
			
		
	
		
			
				
					|  |  |  |  |     checks = defaultdict(lambda: 0) | 
			
		
	
		
			
				
					|  |  |  |  |     controlsd = Controls(CI=self.CI) | 
			
		
	
		
			
				
					|  |  |  |  |     controlsd.initialized = True | 
			
		
	
		
			
				
					|  |  |  |  |     for idx, can in enumerate(self.can_msgs): | 
			
		
	
		
			
				
					|  |  |  |  |       CS = self.CI.update(CC, (can.as_builder().to_bytes(), )) | 
			
		
	
		
			
				
					|  |  |  |  |       for msg in filter(lambda m: m.src in range(64), can.can): | 
			
		
	
		
			
				
					|  |  |  |  |         to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) | 
			
		
	
		
			
				
					|  |  |  |  |         ret = self.safety.safety_rx_hook(to_send) | 
			
		
	
		
			
				
					|  |  |  |  |         self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # Skip first frame so CS_prev is properly initialized | 
			
		
	
		
			
				
					|  |  |  |  |       if idx == 0: | 
			
		
	
		
			
				
					|  |  |  |  |         CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  |         # Button may be left pressed in warm up period | 
			
		
	
		
			
				
					|  |  |  |  |         if not self.CP.pcmCruise: | 
			
		
	
		
			
				
					|  |  |  |  |           self.safety.set_controls_allowed(0) | 
			
		
	
		
			
				
					|  |  |  |  |         continue | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # TODO: check rest of panda's carstate (steering, ACC main on, etc.) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev() | 
			
		
	
		
			
				
					|  |  |  |  |       checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # TODO: remove this exception once this mismatch is resolved | 
			
		
	
		
			
				
					|  |  |  |  |       brake_pressed = CS.brakePressed | 
			
		
	
		
			
				
					|  |  |  |  |       if CS.brakePressed and not self.safety.get_brake_pressed_prev(): | 
			
		
	
		
			
				
					|  |  |  |  |         if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05: | 
			
		
	
		
			
				
					|  |  |  |  |           brake_pressed = False | 
			
		
	
		
			
				
					|  |  |  |  |       checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev() | 
			
		
	
		
			
				
					|  |  |  |  |       checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       if self.CP.pcmCruise: | 
			
		
	
		
			
				
					|  |  |  |  |         # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state. | 
			
		
	
		
			
				
					|  |  |  |  |         # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but | 
			
		
	
		
			
				
					|  |  |  |  |         # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages). | 
			
		
	
		
			
				
					|  |  |  |  |         if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH: | 
			
		
	
		
			
				
					|  |  |  |  |           # only the rising edges are expected to match | 
			
		
	
		
			
				
					|  |  |  |  |           if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled: | 
			
		
	
		
			
				
					|  |  |  |  |             checks['controlsAllowed'] += not self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  |         else: | 
			
		
	
		
			
				
					|  |  |  |  |           checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         # TODO: fix notCar mismatch | 
			
		
	
		
			
				
					|  |  |  |  |         if not self.CP.notCar: | 
			
		
	
		
			
				
					|  |  |  |  |           checks['cruiseState'] += CS.cruiseState.enabled != self.safety.get_cruise_engaged_prev() | 
			
		
	
		
			
				
					|  |  |  |  |       else: | 
			
		
	
		
			
				
					|  |  |  |  |         # Check for enable events on rising edge of controls allowed | 
			
		
	
		
			
				
					|  |  |  |  |         controlsd.update_events(CS) | 
			
		
	
		
			
				
					|  |  |  |  |         controlsd.CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  |         button_enable = (any(evt.enable for evt in CS.events) and | 
			
		
	
		
			
				
					|  |  |  |  |                          not any(evt == EventName.pedalPressed for evt in controlsd.events.names)) | 
			
		
	
		
			
				
					|  |  |  |  |         mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev) | 
			
		
	
		
			
				
					|  |  |  |  |         checks['controlsAllowed'] += mismatch | 
			
		
	
		
			
				
					|  |  |  |  |         controls_allowed_prev = self.safety.get_controls_allowed() | 
			
		
	
		
			
				
					|  |  |  |  |         if button_enable and not mismatch: | 
			
		
	
		
			
				
					|  |  |  |  |           self.safety.set_controls_allowed(False) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       if self.CP.carName == "honda": | 
			
		
	
		
			
				
					|  |  |  |  |         checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       CS_prev = CS | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     failed_checks = {k: v for k, v in checks.items() if v > 0} | 
			
		
	
		
			
				
					|  |  |  |  |     self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | @parameterized_class(('car_model', 'test_route'), get_test_cases()) | 
			
		
	
	
		
			
				
					|  |  |  | 
 |