Cache FW query (#1025)
* split fw query and matching
* Read cached firmware versions
* add tests
* this works
old-commit-hash: b7aeb5d64d
commatwo_master
parent
488a4707b0
commit
1dee8638d6
10 changed files with 72 additions and 938 deletions
@ -1,132 +0,0 @@ |
|||||||
#!/usr/bin/env python3 |
|
||||||
import os |
|
||||||
import unittest |
|
||||||
import requests |
|
||||||
from cereal import car |
|
||||||
|
|
||||||
from tools.lib.logreader import LogReader |
|
||||||
|
|
||||||
from opendbc.can.parser import CANParser |
|
||||||
|
|
||||||
from selfdrive.car.honda.values import CAR as HONDA |
|
||||||
from selfdrive.car.honda.interface import CarInterface as HondaCarInterface |
|
||||||
from selfdrive.car.honda.carcontroller import CarController as HondaCarController |
|
||||||
from selfdrive.car.honda.radar_interface import RadarInterface as HondaRadarInterface |
|
||||||
|
|
||||||
from selfdrive.car.toyota.values import CAR as TOYOTA |
|
||||||
from selfdrive.car.toyota.interface import CarInterface as ToyotaCarInterface |
|
||||||
from selfdrive.car.toyota.carcontroller import CarController as ToyotaCarController |
|
||||||
from selfdrive.car.toyota.radar_interface import RadarInterface as ToyotaRadarInterface |
|
||||||
|
|
||||||
BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" |
|
||||||
|
|
||||||
def run_route(route, car_name, CarInterface, CarController): |
|
||||||
lr = LogReader("/tmp/"+route + ".bz2") |
|
||||||
print(lr) |
|
||||||
|
|
||||||
cps = [] |
|
||||||
def CANParserHook(dbc_name, signals, checks=None, bus=0, sendcan=False, tcp_addr="127.0.0.1", timeout=-1): |
|
||||||
cp = CANParser(dbc_name, signals, checks, bus, sendcan, "", timeout) |
|
||||||
cps.append(cp) |
|
||||||
return cp |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController, CANParserHook) |
|
||||||
print(CI) |
|
||||||
|
|
||||||
i = 0 |
|
||||||
last_monotime = 0 |
|
||||||
for msg in lr: |
|
||||||
if msg.which() == 'can': |
|
||||||
msg_bytes = msg.as_builder().to_bytes() |
|
||||||
monotime = msg.logMonoTime |
|
||||||
for x in cps: |
|
||||||
x.update_string(monotime, msg_bytes) |
|
||||||
|
|
||||||
if (monotime-last_monotime) > 0.01: |
|
||||||
control = car.CarControl.new_message() |
|
||||||
CS = CI.update(control) |
|
||||||
if i % 100 == 0: |
|
||||||
print('\033[2J\033[H'+str(CS)) |
|
||||||
last_monotime = monotime |
|
||||||
i += 1 |
|
||||||
|
|
||||||
return True |
|
||||||
|
|
||||||
def run_route_radar(route, car_name, RadarInterface, CarInterface): |
|
||||||
lr = LogReader("/tmp/"+route + ".bz2") |
|
||||||
print(lr) |
|
||||||
|
|
||||||
cps = [] |
|
||||||
def CANParserHook(dbc_name, signals, checks=None, bus=0, sendcan=False, tcp_addr="127.0.0.1", timeout=-1): |
|
||||||
cp = CANParser(dbc_name, signals, checks, bus, sendcan, "", timeout) |
|
||||||
print(signals) |
|
||||||
cps.append(cp) |
|
||||||
return cp |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
RI = RadarInterface(params, CANParserHook) |
|
||||||
|
|
||||||
i = 0 |
|
||||||
updated_messages = set() |
|
||||||
for msg in lr: |
|
||||||
if msg.which() == 'can': |
|
||||||
msg_bytes = msg.as_builder().to_bytes() |
|
||||||
_, vls = cps[0].update_string(msg.logMonoTime, msg_bytes) |
|
||||||
updated_messages.update(vls) |
|
||||||
if RI.trigger_msg in updated_messages: |
|
||||||
ret = RI._update(updated_messages) |
|
||||||
if i % 10 == 0: |
|
||||||
print('\033[2J\033[H'+str(ret)) |
|
||||||
updated_messages = set() |
|
||||||
i += 1 |
|
||||||
|
|
||||||
return True |
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this generic |
|
||||||
class TestCarInterface(unittest.TestCase): |
|
||||||
def setUp(self): |
|
||||||
self.routes = { |
|
||||||
HONDA.CIVIC: "b0c9d2329ad1606b|2019-05-30--20-23-57", |
|
||||||
HONDA.ACCORD: "0375fdf7b1ce594d|2019-05-21--20-10-33", |
|
||||||
TOYOTA.PRIUS: "38bfd238edecbcd7|2019-06-07--10-15-25", |
|
||||||
TOYOTA.RAV4: "02ec6bea180a4d36|2019-04-17--11-21-35" |
|
||||||
} |
|
||||||
|
|
||||||
for route in self.routes.values(): |
|
||||||
route_filename = route + ".bz2" |
|
||||||
if not os.path.isfile("/tmp/"+route_filename): |
|
||||||
with open("/tmp/"+route + ".bz2", "w") as f: |
|
||||||
f.write(requests.get(BASE_URL + route_filename).content) |
|
||||||
|
|
||||||
def test_parser_civic(self): |
|
||||||
#self.assertTrue(run_route(self.routes[HONDA.CIVIC], HONDA.CIVIC, HondaCarInterface, HondaCarController)) |
|
||||||
pass |
|
||||||
|
|
||||||
def test_parser_accord(self): |
|
||||||
# one honda |
|
||||||
#self.assertTrue(run_route(self.routes[HONDA.ACCORD], HONDA.ACCORD, HondaCarInterface, HondaCarController)) |
|
||||||
pass |
|
||||||
|
|
||||||
def test_parser_prius(self): |
|
||||||
#self.assertTrue(run_route(self.routes[TOYOTA.PRIUS], TOYOTA.PRIUS, ToyotaCarInterface, ToyotaCarController)) |
|
||||||
pass |
|
||||||
|
|
||||||
def test_parser_rav4(self): |
|
||||||
# hmm, rav4 is broken |
|
||||||
#self.assertTrue(run_route(self.routes[TOYOTA.RAV4], TOYOTA.RAV4, ToyotaCarInterface, ToyotaCarController)) |
|
||||||
pass |
|
||||||
|
|
||||||
def test_radar_civic(self): |
|
||||||
#self.assertTrue(run_route_radar(self.routes[HONDA.CIVIC], HONDA.CIVIC, HondaRadarInterface, HondaCarInterface)) |
|
||||||
pass |
|
||||||
|
|
||||||
def test_radar_prius(self): |
|
||||||
self.assertTrue(run_route_radar(self.routes[TOYOTA.PRIUS], TOYOTA.PRIUS, ToyotaRadarInterface, ToyotaCarInterface)) |
|
||||||
pass |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": |
|
||||||
unittest.main() |
|
||||||
|
|
@ -0,0 +1,46 @@ |
|||||||
|
#!/usr/bin/env python3 |
||||||
|
import unittest |
||||||
|
from cereal import car |
||||||
|
from selfdrive.car.fw_versions import match_fw_to_car |
||||||
|
from selfdrive.car.toyota.values import CAR as TOYOTA |
||||||
|
|
||||||
|
CarFw = car.CarParams.CarFw |
||||||
|
Ecu = car.CarParams.Ecu |
||||||
|
|
||||||
|
|
||||||
|
class TestFwFingerprint(unittest.TestCase): |
||||||
|
def assertFingerprints(self, candidates, expected): |
||||||
|
candidates = list(candidates) |
||||||
|
self.assertEqual(len(candidates), 1) |
||||||
|
self.assertEqual(candidates[0], TOYOTA.RAV4_TSS2) |
||||||
|
|
||||||
|
def test_rav4_tss2(self): |
||||||
|
CP = car.CarParams.new_message() |
||||||
|
CP.carFw = [ |
||||||
|
{"ecu": Ecu.esp, |
||||||
|
"fwVersion": b"\x01F15260R210\x00\x00\x00\x00\x00\x00", |
||||||
|
"address": 1968, |
||||||
|
"subAddress": 0}, |
||||||
|
{"ecu": Ecu.engine, |
||||||
|
"fwVersion": b"\x028966342Y8000\x00\x00\x00\x00897CF1201001\x00\x00\x00\x00", |
||||||
|
"address": 1792, |
||||||
|
"subAddress": 0}, |
||||||
|
{"ecu": Ecu.eps, |
||||||
|
"fwVersion": b"\x028965B0R01200\x00\x00\x00\x008965B0R02200\x00\x00\x00\x00", |
||||||
|
"address": 1953, |
||||||
|
"subAddress": 0}, |
||||||
|
{"ecu": Ecu.fwdRadar, |
||||||
|
"fwVersion": b"\x018821F3301200\x00\x00\x00\x00", |
||||||
|
"address": 1872, |
||||||
|
"subAddress": 15}, |
||||||
|
{"ecu": Ecu.fwdCamera, |
||||||
|
"fwVersion": b"\x028646F4203300\x00\x00\x00\x008646G26011A0\x00\x00\x00\x00", |
||||||
|
"address": 1872, |
||||||
|
"subAddress": 109} |
||||||
|
] |
||||||
|
|
||||||
|
self.assertFingerprints(match_fw_to_car(CP.carFw), TOYOTA.RAV4_TSS2) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
unittest.main() |
@ -1,435 +0,0 @@ |
|||||||
#!/usr/bin/env python3 |
|
||||||
|
|
||||||
import unittest |
|
||||||
from cereal import car, log |
|
||||||
from selfdrive.car.honda.values import CAR as HONDA |
|
||||||
from selfdrive.car.honda.carcontroller import CarController |
|
||||||
from selfdrive.car.honda.interface import CarInterface |
|
||||||
from common.realtime import sec_since_boot |
|
||||||
|
|
||||||
from selfdrive.boardd.boardd import can_list_to_can_capnp |
|
||||||
from selfdrive.config import Conversions as CV |
|
||||||
import cereal.messaging as messaging |
|
||||||
from cereal.services import service_list |
|
||||||
from opendbc.can.parser import CANParser |
|
||||||
|
|
||||||
import zmq |
|
||||||
import time |
|
||||||
import numpy as np |
|
||||||
|
|
||||||
|
|
||||||
class TestHondaCarcontroller(unittest.TestCase): |
|
||||||
def test_honda_lkas_hud(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.CIVIC |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('SET_ME_X41', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X48', 'LKAS_HUD', 0), |
|
||||||
('STEERING_REQUIRED', 'LKAS_HUD', 0), |
|
||||||
('SOLID_LANES', 'LKAS_HUD', 0), |
|
||||||
('LEAD_SPEED', 'RADAR_HUD', 0), |
|
||||||
('LEAD_STATE', 'RADAR_HUD', 0), |
|
||||||
('LEAD_DISTANCE', 'RADAR_HUD', 0), |
|
||||||
('ACC_ALERTS', 'RADAR_HUD', 0), |
|
||||||
] |
|
||||||
|
|
||||||
VA = car.CarControl.HUDControl.VisualAlert |
|
||||||
|
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
|
|
||||||
alerts = { |
|
||||||
VA.none: 0, |
|
||||||
VA.brakePressed: 10, |
|
||||||
VA.wrongGear: 6, |
|
||||||
VA.seatbeltUnbuckled: 5, |
|
||||||
VA.speedTooHigh: 8, |
|
||||||
} |
|
||||||
|
|
||||||
for steer_required in [True, False]: |
|
||||||
for lanes in [True, False]: |
|
||||||
for alert in alerts.keys(): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
|
|
||||||
control.enabled = True |
|
||||||
|
|
||||||
if steer_required: |
|
||||||
hud.visualAlert = VA.steerRequired |
|
||||||
else: |
|
||||||
hud.visualAlert = alert |
|
||||||
|
|
||||||
hud.lanesVisible = lanes |
|
||||||
control.hudControl = hud |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertEqual(0x41, parser.vl['LKAS_HUD']['SET_ME_X41']) |
|
||||||
self.assertEqual(0x48, parser.vl['LKAS_HUD']['SET_ME_X48']) |
|
||||||
self.assertEqual(steer_required, parser.vl['LKAS_HUD']['STEERING_REQUIRED']) |
|
||||||
self.assertEqual(lanes, parser.vl['LKAS_HUD']['SOLID_LANES']) |
|
||||||
|
|
||||||
self.assertEqual(0x1fe, parser.vl['RADAR_HUD']['LEAD_SPEED']) |
|
||||||
self.assertEqual(0x7, parser.vl['RADAR_HUD']['LEAD_STATE']) |
|
||||||
self.assertEqual(0x1e, parser.vl['RADAR_HUD']['LEAD_DISTANCE']) |
|
||||||
self.assertEqual(alerts[alert] if not steer_required else 0, parser.vl['RADAR_HUD']['ACC_ALERTS']) |
|
||||||
|
|
||||||
def test_honda_ui_cruise_speed(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.CIVIC |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
# 780 - 0x30c |
|
||||||
('CRUISE_SPEED', 'ACC_HUD', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for cruise_speed in np.linspace(0, 50, 20): |
|
||||||
for visible in [False, True]: |
|
||||||
control = car.CarControl.new_message() |
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
hud.setSpeed = float(cruise_speed) |
|
||||||
hud.speedVisible = visible |
|
||||||
control.enabled = True |
|
||||||
control.hudControl = hud |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
expected_cruise_speed = round(cruise_speed * CV.MS_TO_KPH) |
|
||||||
if not visible: |
|
||||||
expected_cruise_speed = 255 |
|
||||||
|
|
||||||
self.assertAlmostEqual(parser.vl['ACC_HUD']['CRUISE_SPEED'], expected_cruise_speed, msg="Car: %s, speed: %.2f" % (car_name, cruise_speed)) |
|
||||||
|
|
||||||
def test_honda_ui_pcm_accel(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.CIVIC |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
# 780 - 0x30c |
|
||||||
('PCM_GAS', 'ACC_HUD', 0), |
|
||||||
|
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for pcm_accel in np.linspace(0, 1, 25): |
|
||||||
cc = car.CarControl.CruiseControl.new_message() |
|
||||||
cc.accelOverride = float(pcm_accel) |
|
||||||
control = car.CarControl.new_message() |
|
||||||
control.enabled = True |
|
||||||
control.cruiseControl = cc |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertAlmostEqual(parser.vl['ACC_HUD']['PCM_GAS'], int(0xc6 * pcm_accel), msg="Car: %s, accel: %.2f" % (car_name, pcm_accel)) |
|
||||||
|
|
||||||
def test_honda_ui_pcm_speed(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.CIVIC |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
# 780 - 0x30c |
|
||||||
('PCM_SPEED', 'ACC_HUD', 99), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for pcm_speed in np.linspace(0, 100, 20): |
|
||||||
cc = car.CarControl.CruiseControl.new_message() |
|
||||||
cc.speedOverride = float(pcm_speed * CV.KPH_TO_MS) |
|
||||||
control = car.CarControl.new_message() |
|
||||||
control.enabled = True |
|
||||||
control.cruiseControl = cc |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertAlmostEqual(parser.vl['ACC_HUD']['PCM_SPEED'], round(pcm_speed, 2), msg="Car: %s, speed: %.2f" % (car_name, pcm_speed)) |
|
||||||
|
|
||||||
def test_honda_ui_hud_lead(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
for car_name in [HONDA.CIVIC]: |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
# 780 - 0x30c |
|
||||||
# 3: acc off, 2: solid car (hud_show_car), 1: dashed car (enabled, not hud show car), 0: no car (not enabled) |
|
||||||
('HUD_LEAD', 'ACC_HUD', 99), |
|
||||||
('SET_ME_X03', 'ACC_HUD', 99), |
|
||||||
('SET_ME_X03_2', 'ACC_HUD', 99), |
|
||||||
('SET_ME_X01', 'ACC_HUD', 99), |
|
||||||
('ENABLE_MINI_CAR', 'ACC_HUD', 99), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for enabled in [True, False]: |
|
||||||
for leadVisible in [True, False]: |
|
||||||
|
|
||||||
control = car.CarControl.new_message() |
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
hud.leadVisible = leadVisible |
|
||||||
control.enabled = enabled |
|
||||||
control.hudControl = hud |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
if not enabled: |
|
||||||
hud_lead = 0 |
|
||||||
else: |
|
||||||
hud_lead = 2 if leadVisible else 1 |
|
||||||
self.assertEqual(int(parser.vl['ACC_HUD']['HUD_LEAD']), hud_lead, msg="Car: %s, lead: %s, enabled %s" % (car_name, leadVisible, enabled)) |
|
||||||
self.assertTrue(parser.vl['ACC_HUD']['ENABLE_MINI_CAR']) |
|
||||||
self.assertEqual(0x3, parser.vl['ACC_HUD']['SET_ME_X03']) |
|
||||||
self.assertEqual(0x3, parser.vl['ACC_HUD']['SET_ME_X03_2']) |
|
||||||
self.assertEqual(0x1, parser.vl['ACC_HUD']['SET_ME_X01']) |
|
||||||
|
|
||||||
|
|
||||||
def test_honda_steering(self): |
|
||||||
self.longMessage = True |
|
||||||
limits = { |
|
||||||
HONDA.CIVIC: 0x1000, |
|
||||||
HONDA.ODYSSEY: 0x1000, |
|
||||||
HONDA.PILOT: 0x1000, |
|
||||||
HONDA.CRV: 0x3e8, |
|
||||||
HONDA.ACURA_ILX: 0xF00, |
|
||||||
HONDA.ACURA_RDX: 0x3e8, |
|
||||||
} |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
for car_name in limits.keys(): |
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('STEER_TORQUE', 'STEERING_CONTROL', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for steer in np.linspace(-1., 1., 25): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
actuators.steer = float(steer) |
|
||||||
control.enabled = True |
|
||||||
control.actuators = actuators |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.steer_not_allowed = False |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
torque = parser.vl['STEERING_CONTROL']['STEER_TORQUE'] |
|
||||||
self.assertAlmostEqual(int(limits[car_name] * -actuators.steer), torque, msg="Car: %s, steer %.2f" % (car_name, steer)) |
|
||||||
|
|
||||||
sendcan.close() |
|
||||||
|
|
||||||
def test_honda_gas(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.ACURA_ILX |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name, {0: {0x201: 6}, 1: {}, 2: {}}) # Add interceptor to fingerprint |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('GAS_COMMAND', 'GAS_COMMAND', -1), |
|
||||||
('GAS_COMMAND2', 'GAS_COMMAND', -1), |
|
||||||
('ENABLE', 'GAS_COMMAND', -1), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for gas in np.linspace(0., 0.95, 25): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
actuators.gas = float(gas) |
|
||||||
control.enabled = True |
|
||||||
control.actuators = actuators |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.steer_not_allowed = False |
|
||||||
|
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
gas_command = parser.vl['GAS_COMMAND']['GAS_COMMAND'] / 255.0 |
|
||||||
gas_command2 = parser.vl['GAS_COMMAND']['GAS_COMMAND2'] / 255.0 |
|
||||||
enabled = gas > 0.001 |
|
||||||
self.assertEqual(enabled, parser.vl['GAS_COMMAND']['ENABLE'], msg="Car: %s, gas %.2f" % (car_name, gas)) |
|
||||||
if enabled: |
|
||||||
self.assertAlmostEqual(gas, gas_command, places=2, msg="Car: %s, gas %.2f" % (car_name, gas)) |
|
||||||
self.assertAlmostEqual(gas, gas_command2, places=2, msg="Car: %s, gas %.2f" % (car_name, gas)) |
|
||||||
|
|
||||||
sendcan.close() |
|
||||||
|
|
||||||
def test_honda_brake(self): |
|
||||||
self.longMessage = True |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
car_name = HONDA.CIVIC |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('COMPUTER_BRAKE', 'BRAKE_COMMAND', 0), |
|
||||||
('BRAKE_PUMP_REQUEST', 'BRAKE_COMMAND', 0), # pump_on |
|
||||||
('CRUISE_OVERRIDE', 'BRAKE_COMMAND', 0), # pcm_override |
|
||||||
('CRUISE_FAULT_CMD', 'BRAKE_COMMAND', 0), # pcm_fault_cmd |
|
||||||
('CRUISE_CANCEL_CMD', 'BRAKE_COMMAND', 0), # pcm_cancel_cmd |
|
||||||
('COMPUTER_BRAKE_REQUEST', 'BRAKE_COMMAND', 0), # brake_rq |
|
||||||
('SET_ME_0X80', 'BRAKE_COMMAND', 0), |
|
||||||
('BRAKE_LIGHTS', 'BRAKE_COMMAND', 0), # brakelights |
|
||||||
('FCW', 'BRAKE_COMMAND', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
VA = car.CarControl.HUDControl.VisualAlert |
|
||||||
|
|
||||||
for override in [True, False]: |
|
||||||
for cancel in [True, False]: |
|
||||||
for fcw in [True, False]: |
|
||||||
steps = 25 if not override and not cancel else 2 |
|
||||||
for brake in np.linspace(0., 0.95, steps): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
|
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
if fcw: |
|
||||||
hud.visualAlert = VA.fcw |
|
||||||
|
|
||||||
cruise = car.CarControl.CruiseControl.new_message() |
|
||||||
cruise.cancel = cancel |
|
||||||
cruise.override = override |
|
||||||
|
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
actuators.brake = float(brake) |
|
||||||
|
|
||||||
control.enabled = True |
|
||||||
control.actuators = actuators |
|
||||||
control.hudControl = hud |
|
||||||
control.cruiseControl = cruise |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.steer_not_allowed = False |
|
||||||
|
|
||||||
for _ in range(20): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
brake_command = parser.vl['BRAKE_COMMAND']['COMPUTER_BRAKE'] |
|
||||||
min_expected_brake = int(1024 / 4 * (actuators.brake - 0.02)) |
|
||||||
max_expected_brake = int(1024 / 4 * (actuators.brake + 0.02)) |
|
||||||
braking = actuators.brake > 0 |
|
||||||
|
|
||||||
braking_ok = min_expected_brake <= brake_command <= max_expected_brake |
|
||||||
if steps == 2: |
|
||||||
braking_ok = True |
|
||||||
|
|
||||||
self.assertTrue(braking_ok, msg="Car: %s, brake %.2f" % (car_name, brake)) |
|
||||||
self.assertEqual(0x80, parser.vl['BRAKE_COMMAND']['SET_ME_0X80']) |
|
||||||
self.assertEqual(braking, parser.vl['BRAKE_COMMAND']['BRAKE_PUMP_REQUEST']) |
|
||||||
self.assertEqual(braking, parser.vl['BRAKE_COMMAND']['COMPUTER_BRAKE_REQUEST']) |
|
||||||
self.assertEqual(braking, parser.vl['BRAKE_COMMAND']['BRAKE_LIGHTS']) |
|
||||||
self.assertFalse(parser.vl['BRAKE_COMMAND']['CRUISE_FAULT_CMD']) |
|
||||||
self.assertEqual(override, parser.vl['BRAKE_COMMAND']['CRUISE_OVERRIDE']) |
|
||||||
self.assertEqual(cancel, parser.vl['BRAKE_COMMAND']['CRUISE_CANCEL_CMD']) |
|
||||||
self.assertEqual(fcw, bool(parser.vl['BRAKE_COMMAND']['FCW'])) |
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
unittest.main() |
|
@ -1,348 +0,0 @@ |
|||||||
#!/usr/bin/env python3 |
|
||||||
|
|
||||||
import unittest |
|
||||||
from cereal import car, log |
|
||||||
from selfdrive.car.toyota.values import CAR as TOYOTA |
|
||||||
from selfdrive.car.toyota.carcontroller import CarController |
|
||||||
from selfdrive.car.toyota.interface import CarInterface |
|
||||||
from common.realtime import sec_since_boot |
|
||||||
|
|
||||||
from selfdrive.boardd.boardd import can_list_to_can_capnp |
|
||||||
from selfdrive.config import Conversions as CV |
|
||||||
import cereal.messaging as messaging |
|
||||||
from cereal.services import service_list |
|
||||||
from opendbc.can.parser import CANParser |
|
||||||
import zmq |
|
||||||
import time |
|
||||||
import numpy as np |
|
||||||
|
|
||||||
|
|
||||||
class TestToyotaCarcontroller(unittest.TestCase): |
|
||||||
def test_fcw(self): |
|
||||||
# TODO: This message has a 0xc1 setme which is not yet checked or in the dbc file |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('FCW', 'ACC_HUD', 0), |
|
||||||
('SET_ME_X20', 'ACC_HUD', 0), |
|
||||||
('SET_ME_X10', 'ACC_HUD', 0), |
|
||||||
('SET_ME_X80', 'ACC_HUD', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
VA = car.CarControl.HUDControl.VisualAlert |
|
||||||
for fcw in [True, False]: |
|
||||||
control = car.CarControl.new_message() |
|
||||||
control.enabled = True |
|
||||||
|
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
if fcw: |
|
||||||
hud.visualAlert = VA.fcw |
|
||||||
control.hudControl = hud |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(200): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertEqual(fcw, parser.vl['ACC_HUD']['FCW']) |
|
||||||
self.assertEqual(0x20, parser.vl['ACC_HUD']['SET_ME_X20']) |
|
||||||
self.assertEqual(0x10, parser.vl['ACC_HUD']['SET_ME_X10']) |
|
||||||
self.assertEqual(0x80, parser.vl['ACC_HUD']['SET_ME_X80']) |
|
||||||
|
|
||||||
def test_ui(self): |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('BARRIERS', 'LKAS_HUD', -1), |
|
||||||
('RIGHT_LINE', 'LKAS_HUD', 0), |
|
||||||
('LEFT_LINE', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X01', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X01_2', 'LKAS_HUD', 0), |
|
||||||
('LDA_ALERT', 'LKAS_HUD', -1), |
|
||||||
('SET_ME_X0C', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X2C', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X38', 'LKAS_HUD', 0), |
|
||||||
('SET_ME_X02', 'LKAS_HUD', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
VA = car.CarControl.HUDControl.VisualAlert |
|
||||||
|
|
||||||
for left_lane in [True, False]: |
|
||||||
for right_lane in [True, False]: |
|
||||||
for steer in [True, False]: |
|
||||||
control = car.CarControl.new_message() |
|
||||||
control.enabled = True |
|
||||||
|
|
||||||
hud = car.CarControl.HUDControl.new_message() |
|
||||||
if steer: |
|
||||||
hud.visualAlert = VA.steerRequired |
|
||||||
|
|
||||||
hud.leftLaneVisible = left_lane |
|
||||||
hud.rightLaneVisible = right_lane |
|
||||||
|
|
||||||
control.hudControl = hud |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
for _ in range(200): # UI is only sent at 1Hz |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertEqual(0x0c, parser.vl['LKAS_HUD']['SET_ME_X0C']) |
|
||||||
self.assertEqual(0x2c, parser.vl['LKAS_HUD']['SET_ME_X2C']) |
|
||||||
self.assertEqual(0x38, parser.vl['LKAS_HUD']['SET_ME_X38']) |
|
||||||
self.assertEqual(0x02, parser.vl['LKAS_HUD']['SET_ME_X02']) |
|
||||||
self.assertEqual(0, parser.vl['LKAS_HUD']['BARRIERS']) |
|
||||||
self.assertEqual(1 if right_lane else 2, parser.vl['LKAS_HUD']['RIGHT_LINE']) |
|
||||||
self.assertEqual(1 if left_lane else 2, parser.vl['LKAS_HUD']['LEFT_LINE']) |
|
||||||
self.assertEqual(1, parser.vl['LKAS_HUD']['SET_ME_X01']) |
|
||||||
self.assertEqual(1, parser.vl['LKAS_HUD']['SET_ME_X01_2']) |
|
||||||
self.assertEqual(steer, parser.vl['LKAS_HUD']['LDA_ALERT']) |
|
||||||
|
|
||||||
def test_standstill_and_cancel(self): |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('RELEASE_STANDSTILL', 'ACC_CONTROL', 0), |
|
||||||
('CANCEL_REQ', 'ACC_CONTROL', 0), |
|
||||||
('SET_ME_X3', 'ACC_CONTROL', 0), |
|
||||||
('SET_ME_1', 'ACC_CONTROL', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
control = car.CarControl.new_message() |
|
||||||
control.enabled = True |
|
||||||
|
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.pcm_acc_status = 8 # Active |
|
||||||
CI.CS.standstill = True |
|
||||||
can_sends = CI.apply(control) |
|
||||||
|
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertEqual(0x3, parser.vl['ACC_CONTROL']['SET_ME_X3']) |
|
||||||
self.assertEqual(1, parser.vl['ACC_CONTROL']['SET_ME_1']) |
|
||||||
self.assertFalse(parser.vl['ACC_CONTROL']['RELEASE_STANDSTILL']) |
|
||||||
self.assertFalse(parser.vl['ACC_CONTROL']['CANCEL_REQ']) |
|
||||||
|
|
||||||
CI.CS.pcm_acc_status = 7 # Standstill |
|
||||||
|
|
||||||
for _ in range(10): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertTrue(parser.vl['ACC_CONTROL']['RELEASE_STANDSTILL']) |
|
||||||
|
|
||||||
cruise = car.CarControl.CruiseControl.new_message() |
|
||||||
cruise.cancel = True |
|
||||||
control.cruiseControl = cruise |
|
||||||
|
|
||||||
for _ in range(10): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
|
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
self.assertTrue(parser.vl['ACC_CONTROL']['CANCEL_REQ']) |
|
||||||
|
|
||||||
@unittest.skip("IPAS logic changed, fix test") |
|
||||||
def test_steering_ipas(self): |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
params.enableApgs = True |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
CI.CC.angle_control = True |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('SET_ME_X10', 'STEERING_IPAS', 0), |
|
||||||
('SET_ME_X40', 'STEERING_IPAS', 0), |
|
||||||
('ANGLE', 'STEERING_IPAS', 0), |
|
||||||
('STATE', 'STEERING_IPAS', 0), |
|
||||||
('DIRECTION_CMD', 'STEERING_IPAS', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for enabled in [True, False]: |
|
||||||
for steer in np.linspace(-510., 510., 25): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
actuators.steerAngle = float(steer) |
|
||||||
control.enabled = enabled |
|
||||||
control.actuators = actuators |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.steer_not_allowed = False |
|
||||||
|
|
||||||
for _ in range(1000 if steer < -505 else 25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
|
|
||||||
self.assertEqual(0x10, parser.vl['STEERING_IPAS']['SET_ME_X10']) |
|
||||||
self.assertEqual(0x40, parser.vl['STEERING_IPAS']['SET_ME_X40']) |
|
||||||
|
|
||||||
expected_state = 3 if enabled else 1 |
|
||||||
self.assertEqual(expected_state, parser.vl['STEERING_IPAS']['STATE']) |
|
||||||
|
|
||||||
if steer < 0: |
|
||||||
direction = 3 |
|
||||||
elif steer > 0: |
|
||||||
direction = 1 |
|
||||||
else: |
|
||||||
direction = 2 |
|
||||||
|
|
||||||
if not enabled: |
|
||||||
direction = 2 |
|
||||||
self.assertEqual(direction, parser.vl['STEERING_IPAS']['DIRECTION_CMD']) |
|
||||||
|
|
||||||
expected_steer = int(round(steer / 1.5)) * 1.5 if enabled else 0 |
|
||||||
self.assertAlmostEqual(expected_steer, parser.vl['STEERING_IPAS']['ANGLE']) |
|
||||||
|
|
||||||
sendcan.close() |
|
||||||
|
|
||||||
def test_steering(self): |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
limit = 1500 |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('STEER_REQUEST', 'STEERING_LKA', 0), |
|
||||||
('SET_ME_1', 'STEERING_LKA', 0), |
|
||||||
('STEER_TORQUE_CMD', 'STEERING_LKA', -1), |
|
||||||
('LKA_STATE', 'STEERING_LKA', -1), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for steer in np.linspace(-1., 1., 25): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
actuators.steer = float(steer) |
|
||||||
control.enabled = True |
|
||||||
control.actuators = actuators |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
CI.CS.steer_not_allowed = False |
|
||||||
CI.CS.steer_torque_motor = limit * steer |
|
||||||
|
|
||||||
# More control applies for the first one because of rate limits |
|
||||||
for _ in range(1000 if steer < -0.99 else 25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
|
|
||||||
self.assertEqual(1, parser.vl['STEERING_LKA']['SET_ME_1']) |
|
||||||
self.assertEqual(True, parser.vl['STEERING_LKA']['STEER_REQUEST']) |
|
||||||
self.assertAlmostEqual(round(steer * limit), parser.vl['STEERING_LKA']['STEER_TORQUE_CMD']) |
|
||||||
self.assertEqual(0, parser.vl['STEERING_LKA']['LKA_STATE']) |
|
||||||
|
|
||||||
sendcan.close() |
|
||||||
|
|
||||||
def test_accel(self): |
|
||||||
self.longMessage = True |
|
||||||
car_name = TOYOTA.RAV4 |
|
||||||
|
|
||||||
sendcan = messaging.pub_sock('sendcan') |
|
||||||
|
|
||||||
params = CarInterface.get_params(car_name) |
|
||||||
CI = CarInterface(params, CarController) |
|
||||||
|
|
||||||
# Get parser |
|
||||||
parser_signals = [ |
|
||||||
('ACCEL_CMD', 'ACC_CONTROL', 0), |
|
||||||
] |
|
||||||
parser = CANParser(CI.cp.dbc_name, parser_signals, [], 0, sendcan=True, tcp_addr="127.0.0.1") |
|
||||||
time.sleep(0.2) # Slow joiner syndrome |
|
||||||
|
|
||||||
for accel in np.linspace(-3., 1.5, 25): |
|
||||||
control = car.CarControl.new_message() |
|
||||||
actuators = car.CarControl.Actuators.new_message() |
|
||||||
|
|
||||||
gas = accel / 3. if accel > 0. else 0. |
|
||||||
brake = -accel / 3. if accel < 0. else 0. |
|
||||||
|
|
||||||
actuators.gas = float(gas) |
|
||||||
actuators.brake = float(brake) |
|
||||||
control.enabled = True |
|
||||||
control.actuators = actuators |
|
||||||
CI.update(control) |
|
||||||
|
|
||||||
# More control applies for the first one because of rate limits |
|
||||||
for _ in range(25): |
|
||||||
can_sends = CI.apply(control) |
|
||||||
sendcan.send(can_list_to_can_capnp(can_sends, msgtype='sendcan')) |
|
||||||
|
|
||||||
for _ in range(5): |
|
||||||
parser.update(int(sec_since_boot() * 1e9), False) |
|
||||||
time.sleep(0.01) |
|
||||||
|
|
||||||
min_accel = accel - 0.061 |
|
||||||
max_accel = accel + 0.061 |
|
||||||
sent_accel = parser.vl['ACC_CONTROL']['ACCEL_CMD'] |
|
||||||
accel_ok = min_accel <= sent_accel <= max_accel |
|
||||||
self.assertTrue(accel_ok, msg="%.2f <= %.2f <= %.2f" % (min_accel, sent_accel, max_accel)) |
|
||||||
sendcan.close() |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
unittest.main() |
|
Loading…
Reference in new issue