openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

156 lines
6.0 KiB

#!/usr/bin/env python3
import itertools
import numpy as np
import unittest
from parameterized import parameterized_class
from cereal import log
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT
from cereal import car
from openpilot.common.conversions import Conversions as CV
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
def run_cruise_simulation(cruise, e2e, personality, t_end=20.):
man = Maneuver(
'',
duration=t_end,
initial_speed=max(cruise - 1., 0.0),
lead_relevancy=True,
initial_distance_lead=100,
cruise_values=[cruise],
prob_lead_values=[0.0],
breakpoints=[0.],
e2e=e2e,
personality=personality,
)
valid, output = man.evaluate()
assert valid
return output[-1, 3]
@parameterized_class(("e2e", "personality", "speed"), itertools.product(
[True, False], # e2e
log.LongitudinalPersonality.schema.enumerants, # personality
[5,35])) # speed
class TestCruiseSpeed(unittest.TestCase):
def test_cruise_speed(self):
print(f'Testing {self.speed} m/s')
cruise_speed = float(self.speed)
simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e, self.personality)
self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s')
# TODO: test pcmCruise
@parameterized_class(('pcm_cruise',), [(False,)])
class TestVCruiseHelper(unittest.TestCase):
def setUp(self):
self.CP = car.CarParams(pcmCruise=self.pcm_cruise)
self.v_cruise_helper = VCruiseHelper(self.CP)
self.reset_cruise_speed_state()
def reset_cruise_speed_state(self):
# Two resets previous cruise speed
for _ in range(2):
self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False)
def enable(self, v_ego, experimental_mode):
# Simulates user pressing set with a current speed
self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode)
def test_adjust_speed(self):
"""
Asserts speed changes on falling edges of buttons.
"""
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
for btn in (ButtonType.accelCruise, ButtonType.decelCruise):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_rising_edge_enable(self):
"""
Some car interfaces may enable on rising edge of a button,
ensure we don't adjust speed if enabled changes mid-press.
"""
# NOTE: enabled is always one frame behind the result from button press in controlsd
for enabled, pressed in ((False, False),
(False, True),
(True, False)):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False)
if pressed:
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# Expected diff on enabling. Speed should not change on falling edge of pressed
self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_resume_in_standstill(self):
"""
Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill.
"""
self.enable(0, False)
for standstill in (True, False):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True, "standstill": standstill})
CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# speed should only update if not at standstill and button falling edge
should_equal = standstill or pressed
self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_set_gas_pressed(self):
"""
Asserts pressing set while enabled with gas pressed sets
the speed to the maximum of vEgo and current cruise speed.
"""
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# first decrement speed, then perform gas pressed logic
expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT
expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo
expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX))
CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# TODO: fix skipping first run due to enabled on rising edge exception
if v_ego == 0.0:
continue
self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph)
def test_initialize_v_cruise(self):
"""
Asserts allowed cruise speeds on enabling with SET.
"""
for experimental_mode in (True, False):
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.assertFalse(self.v_cruise_helper.v_cruise_initialized)
self.enable(float(v_ego), experimental_mode)
self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX)
self.assertTrue(self.v_cruise_helper.v_cruise_initialized)
if __name__ == "__main__":
unittest.main()