pre-hypothesis

pull/30443/head
Shane Smiskol 2 years ago
parent f5279284f5
commit ab87a86540
  1. 2
      selfdrive/car/tests/routes.py
  2. 445
      selfdrive/car/tests/test_models.py
  3. 1
      selfdrive/car/toyota/carstate.py

@ -177,7 +177,7 @@ routes = [
CarTestRoute("2f37c007683e85ba|2023-09-02--14-39-44", TOYOTA.CAMRY), # openpilot longitudinal, with radar CAN filter CarTestRoute("2f37c007683e85ba|2023-09-02--14-39-44", TOYOTA.CAMRY), # openpilot longitudinal, with radar CAN filter
CarTestRoute("54034823d30962f5|2021-05-24--06-37-34", TOYOTA.CAMRY), # hybrid CarTestRoute("54034823d30962f5|2021-05-24--06-37-34", TOYOTA.CAMRY), # hybrid
CarTestRoute("3456ad0cd7281b24|2020-12-13--17-45-56", TOYOTA.CAMRY_TSS2), CarTestRoute("3456ad0cd7281b24|2020-12-13--17-45-56", TOYOTA.CAMRY_TSS2),
CarTestRoute("ffccc77938ddbc44|2021-01-04--16-55-41", TOYOTA.CAMRY_TSS2), # hybrid # CarTestRoute("ffccc77938ddbc44|2021-01-04--16-55-41", TOYOTA.CAMRY_TSS2), # hybrid
CarTestRoute("4e45c89c38e8ec4d|2021-05-02--02-49-28", TOYOTA.COROLLA), CarTestRoute("4e45c89c38e8ec4d|2021-05-02--02-49-28", TOYOTA.COROLLA),
CarTestRoute("5f5afb36036506e4|2019-05-14--02-09-54", TOYOTA.COROLLA_TSS2), CarTestRoute("5f5afb36036506e4|2019-05-14--02-09-54", TOYOTA.COROLLA_TSS2),
CarTestRoute("5ceff72287a5c86c|2019-10-19--10-59-02", TOYOTA.COROLLA_TSS2), # hybrid CarTestRoute("5ceff72287a5c86c|2019-10-19--10-59-02", TOYOTA.COROLLA_TSS2), # hybrid

@ -3,12 +3,13 @@ import capnp
import os import os
import importlib import importlib
import pytest import pytest
import random
import unittest import unittest
from collections import defaultdict, Counter from collections import defaultdict, Counter
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from parameterized import parameterized_class from parameterized import parameterized_class
from cereal import log, car from cereal import messaging, log, car
from openpilot.common.basedir import BASEDIR from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL from openpilot.common.realtime import DT_CTRL
@ -178,210 +179,248 @@ class TestCarModelBase(unittest.TestCase):
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
self.safety.init_tests() self.safety.init_tests()
def test_car_params(self): # def test_car_params(self):
if self.CP.dashcamOnly: # if self.CP.dashcamOnly:
self.skipTest("no need to check carParams for dashcamOnly") # self.skipTest("no need to check carParams for dashcamOnly")
#
# # make sure car params are within a valid range
# self.assertGreater(self.CP.mass, 1)
#
# if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
# tuning = self.CP.lateralTuning.which()
# if tuning == 'pid':
# self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
# elif tuning == 'torque':
# self.assertTrue(self.CP.lateralTuning.torque.kf > 0)
# else:
# raise Exception("unknown tuning")
#
# def test_car_interface(self):
# # TODO: also check for checksum violations from can parser
# can_invalid_cnt = 0
# can_valid = False
# CC = car.CarControl.new_message()
#
# for i, msg in enumerate(self.can_msgs):
# CS = self.CI.update(CC, (msg.as_builder().to_bytes(),))
# self.CI.apply(CC, msg.logMonoTime)
#
# if CS.canValid:
# can_valid = True
#
# # wait max of 2s for low frequency msgs to be seen
# if i > 200 or can_valid:
# can_invalid_cnt += not CS.canValid
#
# self.assertEqual(can_invalid_cnt, 0)
#
# def test_radar_interface(self):
# os.environ['NO_RADAR_SLEEP'] = "1"
# RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface
# RI = RadarInterface(self.CP)
# assert RI
#
# # Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting,
# # start parsing CAN messages after we've left ELM mode and can expect CAN traffic
# error_cnt = 0
# for i, msg in enumerate(self.can_msgs[self.elm_frame:]):
# rr = RI.update((msg.as_builder().to_bytes(),))
# if rr is not None and i > 50:
# error_cnt += car.RadarData.Error.canError in rr.errors
# self.assertEqual(error_cnt, 0)
#
# def test_panda_safety_rx_valid(self):
# if self.CP.dashcamOnly:
# self.skipTest("no need to check panda safety for dashcamOnly")
#
# start_ts = self.can_msgs[0].logMonoTime
#
# failed_addrs = Counter()
# for can in self.can_msgs:
# # update panda timer
# t = (can.logMonoTime - start_ts) / 1e3
# self.safety.set_timer(int(t))
#
# # run all msgs through the safety RX hook
# for msg in can.can:
# if msg.src >= 64:
# continue
#
# to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
# if self.safety.safety_rx_hook(to_send) != 1:
# failed_addrs[hex(msg.address)] += 1
#
# # ensure all msgs defined in the addr checks are valid
# if self.car_model not in ignore_addr_checks_valid:
# self.safety.safety_tick_current_rx_checks()
# if t > 1e6:
# self.assertTrue(self.safety.addr_checks_valid())
#
# # No need to check relay malfunction on disabled routes (relay closed),
# # or before fingerprinting is done (1s of tolerance to exit silent mode)
# if self.openpilot_enabled and t / 1e4 > (self.elm_frame + 100):
# self.assertFalse(self.safety.get_relay_malfunction())
# else:
# self.safety.set_relay_malfunction(False)
#
# self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}")
#
# def test_panda_safety_tx_cases(self, data=None):
# """Asserts we can tx common messages"""
# if self.CP.notCar:
# self.skipTest("Skipping test for notCar")
#
# def test_car_controller(car_control):
# now_nanos = 0
# msgs_sent = 0
# CI = self.CarInterface(self.CP, self.CarController, self.CarState)
# for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages
# CI.update(car_control, [])
# _, sendcan = CI.apply(car_control, now_nanos)
#
# now_nanos += DT_CTRL * 1e9
# msgs_sent += len(sendcan)
# for addr, _, dat, bus in sendcan:
# to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat)
# self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus))
#
# # Make sure we attempted to send messages
# self.assertGreater(msgs_sent, 50)
#
# # Make sure we can send all messages while inactive
# CC = car.CarControl.new_message()
# test_car_controller(CC)
#
# # Test cancel + general messages (controls_allowed=False & cruise_engaged=True)
# self.safety.set_cruise_engaged_prev(True)
# CC = car.CarControl.new_message(cruiseControl={'cancel': True})
# test_car_controller(CC)
#
# # Test resume + general messages (controls_allowed=True & cruise_engaged=True)
# self.safety.set_controls_allowed(True)
# CC = car.CarControl.new_message(cruiseControl={'resume': True})
# test_car_controller(CC)
def test_panda_safety_carstate_fuzzy(self):
bus = 0#random.randint(0, 3)
address = 466 # random.randint(0x200, 0x300)
for i in range(1000):
# self.setUp()
dat = os.urandom(8)
to_send = libpanda_py.make_CANPacket(address, bus, dat)
did_rx = self.safety.safety_rx_hook(to_send)
can = messaging.new_message('can', 1)
can.can = [log.CanData(address=address, dat=dat, src=bus)]
# make sure car params are within a valid range
self.assertGreater(self.CP.mass, 1)
if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
tuning = self.CP.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
elif tuning == 'torque':
self.assertTrue(self.CP.lateralTuning.torque.kf > 0)
else:
raise Exception("unknown tuning")
def test_car_interface(self):
# TODO: also check for checksum violations from can parser
can_invalid_cnt = 0
can_valid = False
CC = car.CarControl.new_message() CC = car.CarControl.new_message()
CS = self.CI.update(CC, (can.to_bytes(), ))
for i, msg in enumerate(self.can_msgs):
CS = self.CI.update(CC, (msg.as_builder().to_bytes(),)) print('gas_pressed', CS.gasPressed, self.safety.get_gas_pressed_prev())
self.CI.apply(CC, msg.logMonoTime) print('wheel_speeds', CS.wheelSpeeds)
print('standstill', CS.standstill, not self.safety.get_vehicle_moving())
if CS.canValid:
can_valid = True print('did_rx', did_rx)
# if did_rx:
# wait max of 2s for low frequency msgs to be seen # self.assertFalse(True, 'finally did rx: {}'.format(i))
if i > 200 or can_valid:
can_invalid_cnt += not CS.canValid print('\nresults', self.safety.get_gas_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_brake_pressed_prev(), self.safety.get_regen_braking_prev(), self.safety.get_cruise_engaged_prev(), self.safety.get_acc_main_on())
self.assertEqual(can_invalid_cnt, 0) # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev())
# self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving())
def test_radar_interface(self): # self.assertEqual(CS.brakePressed, self.safety.get_brake_pressed_prev())
os.environ['NO_RADAR_SLEEP'] = "1" # self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev())
RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface #
RI = RadarInterface(self.CP) # if self.CP.pcmCruise:
assert RI # self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev())
#
# Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting, # if self.CP.carName == "honda":
# start parsing CAN messages after we've left ELM mode and can expect CAN traffic # self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on())
error_cnt = 0
for i, msg in enumerate(self.can_msgs[self.elm_frame:]):
rr = RI.update((msg.as_builder().to_bytes(),)) # def test_panda_safety_carstate(self):
if rr is not None and i > 50: # """
error_cnt += car.RadarData.Error.canError in rr.errors # Assert that panda safety matches openpilot's carState
self.assertEqual(error_cnt, 0) # """
# if self.CP.dashcamOnly:
def test_panda_safety_rx_valid(self): # self.skipTest("no need to check panda safety for dashcamOnly")
if self.CP.dashcamOnly: #
self.skipTest("no need to check panda safety for dashcamOnly") # CC = car.CarControl.new_message()
#
start_ts = self.can_msgs[0].logMonoTime # # warm up pass, as initial states may be different
# for can in self.can_msgs[:300]:
failed_addrs = Counter() # self.CI.update(CC, (can.as_builder().to_bytes(), ))
for can in self.can_msgs: # for msg in filter(lambda m: m.src in range(64), can.can):
# update panda timer # to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
t = (can.logMonoTime - start_ts) / 1e3 # self.safety.safety_rx_hook(to_send)
self.safety.set_timer(int(t)) #
# controls_allowed_prev = False
# run all msgs through the safety RX hook # CS_prev = car.CarState.new_message()
for msg in can.can: # checks = defaultdict(lambda: 0)
if msg.src >= 64: # controlsd = Controls(CI=self.CI)
continue # controlsd.initialized = True
# for idx, can in enumerate(self.can_msgs):
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) # CS = self.CI.update(CC, (can.as_builder().to_bytes(), ))
if self.safety.safety_rx_hook(to_send) != 1: # for msg in filter(lambda m: m.src in range(64), can.can):
failed_addrs[hex(msg.address)] += 1 # to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
# ret = self.safety.safety_rx_hook(to_send)
# ensure all msgs defined in the addr checks are valid # self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}")
if self.car_model not in ignore_addr_checks_valid: #
self.safety.safety_tick_current_rx_checks() # # Skip first frame so CS_prev is properly initialized
if t > 1e6: # if idx == 0:
self.assertTrue(self.safety.addr_checks_valid()) # CS_prev = CS
# # Button may be left pressed in warm up period
# No need to check relay malfunction on disabled routes (relay closed), # if not self.CP.pcmCruise:
# or before fingerprinting is done (1s of tolerance to exit silent mode) # self.safety.set_controls_allowed(0)
if self.openpilot_enabled and t / 1e4 > (self.elm_frame + 100): # continue
self.assertFalse(self.safety.get_relay_malfunction()) #
else: # # TODO: check rest of panda's carstate (steering, ACC main on, etc.)
self.safety.set_relay_malfunction(False) #
# checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev()
self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") # checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving()
#
def test_panda_safety_tx_cases(self, data=None): # # TODO: remove this exception once this mismatch is resolved
"""Asserts we can tx common messages""" # brake_pressed = CS.brakePressed
if self.CP.notCar: # if CS.brakePressed and not self.safety.get_brake_pressed_prev():
self.skipTest("Skipping test for notCar") # if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
# brake_pressed = False
def test_car_controller(car_control): # checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev()
now_nanos = 0 # checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev()
msgs_sent = 0 #
CI = self.CarInterface(self.CP, self.CarController, self.CarState) # if self.CP.pcmCruise:
for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages # # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state.
CI.update(car_control, []) # # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but
_, sendcan = CI.apply(car_control, now_nanos) # # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages).
# if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH:
now_nanos += DT_CTRL * 1e9 # # only the rising edges are expected to match
msgs_sent += len(sendcan) # if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled:
for addr, _, dat, bus in sendcan: # checks['controlsAllowed'] += not self.safety.get_controls_allowed()
to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat) # else:
self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) # checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed()
#
# Make sure we attempted to send messages # # TODO: fix notCar mismatch
self.assertGreater(msgs_sent, 50) # if not self.CP.notCar:
# checks['cruiseState'] += CS.cruiseState.enabled != self.safety.get_cruise_engaged_prev()
# Make sure we can send all messages while inactive # else:
CC = car.CarControl.new_message() # # Check for enable events on rising edge of controls allowed
test_car_controller(CC) # controlsd.update_events(CS)
# controlsd.CS_prev = CS
# Test cancel + general messages (controls_allowed=False & cruise_engaged=True) # button_enable = (any(evt.enable for evt in CS.events) and
self.safety.set_cruise_engaged_prev(True) # not any(evt == EventName.pedalPressed for evt in controlsd.events.names))
CC = car.CarControl.new_message(cruiseControl={'cancel': True}) # mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev)
test_car_controller(CC) # checks['controlsAllowed'] += mismatch
# controls_allowed_prev = self.safety.get_controls_allowed()
# Test resume + general messages (controls_allowed=True & cruise_engaged=True) # if button_enable and not mismatch:
self.safety.set_controls_allowed(True) # self.safety.set_controls_allowed(False)
CC = car.CarControl.new_message(cruiseControl={'resume': True}) #
test_car_controller(CC) # if self.CP.carName == "honda":
# checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on()
def test_panda_safety_carstate(self): #
""" # CS_prev = CS
Assert that panda safety matches openpilot's carState #
""" # failed_checks = {k: v for k, v in checks.items() if v > 0}
if self.CP.dashcamOnly: # self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
self.skipTest("no need to check panda safety for dashcamOnly")
CC = car.CarControl.new_message()
# warm up pass, as initial states may be different
for can in self.can_msgs[:300]:
self.CI.update(CC, (can.as_builder().to_bytes(), ))
for msg in filter(lambda m: m.src in range(64), can.can):
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
self.safety.safety_rx_hook(to_send)
controls_allowed_prev = False
CS_prev = car.CarState.new_message()
checks = defaultdict(lambda: 0)
controlsd = Controls(CI=self.CI)
controlsd.initialized = True
for idx, can in enumerate(self.can_msgs):
CS = self.CI.update(CC, (can.as_builder().to_bytes(), ))
for msg in filter(lambda m: m.src in range(64), can.can):
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
ret = self.safety.safety_rx_hook(to_send)
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}")
# Skip first frame so CS_prev is properly initialized
if idx == 0:
CS_prev = CS
# Button may be left pressed in warm up period
if not self.CP.pcmCruise:
self.safety.set_controls_allowed(0)
continue
# TODO: check rest of panda's carstate (steering, ACC main on, etc.)
checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev()
checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving()
# TODO: remove this exception once this mismatch is resolved
brake_pressed = CS.brakePressed
if CS.brakePressed and not self.safety.get_brake_pressed_prev():
if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
brake_pressed = False
checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev()
checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev()
if self.CP.pcmCruise:
# On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state.
# On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but
# openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages).
if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH:
# only the rising edges are expected to match
if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled:
checks['controlsAllowed'] += not self.safety.get_controls_allowed()
else:
checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed()
# TODO: fix notCar mismatch
if not self.CP.notCar:
checks['cruiseState'] += CS.cruiseState.enabled != self.safety.get_cruise_engaged_prev()
else:
# Check for enable events on rising edge of controls allowed
controlsd.update_events(CS)
controlsd.CS_prev = CS
button_enable = (any(evt.enable for evt in CS.events) and
not any(evt == EventName.pedalPressed for evt in controlsd.events.names))
mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev)
checks['controlsAllowed'] += mismatch
controls_allowed_prev = self.safety.get_controls_allowed()
if button_enable and not mismatch:
self.safety.set_controls_allowed(False)
if self.CP.carName == "honda":
checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on()
CS_prev = CS
failed_checks = {k: v for k, v in checks.items() if v > 0}
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
@parameterized_class(('car_model', 'test_route'), get_test_cases()) @parameterized_class(('car_model', 'test_route'), get_test_cases())

@ -60,6 +60,7 @@ class CarState(CarStateBase):
else: else:
# TODO: find a common gas pedal percentage signal # TODO: find a common gas pedal percentage signal
ret.gasPressed = cp.vl["PCM_CRUISE"]["GAS_RELEASED"] == 0 ret.gasPressed = cp.vl["PCM_CRUISE"]["GAS_RELEASED"] == 0
print(cp.vl["PCM_CRUISE"]["GAS_RELEASED"])
ret.wheelSpeeds = self.get_wheel_speeds( ret.wheelSpeeds = self.get_wheel_speeds(
cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_FL"], cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_FL"],

Loading…
Cancel
Save