openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

106 lines
2.9 KiB

#!/usr/bin/env python2
import os
import unittest
import requests
import selfdrive.messaging as messaging
from selfdrive.can.parser import CANParser as CANParserNew
from selfdrive.can.tests.parser_old import CANParser as CANParserOld
from selfdrive.car.honda.carstate import get_can_signals
from selfdrive.car.honda.interface import CarInterface
from selfdrive.car.honda.values import CAR, DBC
from selfdrive.services import service_list
from tools.lib.logreader import LogReader
BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/"
DT = int(0.01 * 1e9) # ns
def dict_keys_differ(dict1, dict2):
keys1 = set(dict1.keys())
keys2 = set(dict2.keys())
if keys1 != keys2:
return True
for k in keys1:
keys1 = set(dict1[k].keys())
keys2 = set(dict2[k].keys())
if keys1 != keys2:
return True
return False
def dicts_vals_differ(dict1, dict2):
for k_outer in dict1.keys():
for k_inner in dict1[k_outer].keys():
if dict1[k_outer][k_inner] != dict2[k_outer][k_inner]:
return True
return False
def run_route(route):
can = messaging.pub_sock(service_list['can'].port)
CP = CarInterface.get_params(CAR.CIVIC, {})
signals, checks = get_can_signals(CP)
parser_old = CANParserOld(DBC[CP.carFingerprint]['pt'], signals, checks, 0, timeout=-1, tcp_addr="127.0.0.1")
parser_new = CANParserNew(DBC[CP.carFingerprint]['pt'], signals, checks, 0, timeout=-1, tcp_addr="127.0.0.1")
parser_string = CANParserNew(DBC[CP.carFingerprint]['pt'], signals, checks, 0, timeout=-1)
if dict_keys_differ(parser_old.vl, parser_new.vl):
return False
lr = LogReader(route + ".bz2")
route_ok = True
for msg in lr:
if msg.which() == 'can':
t = msg.logMonoTime
msg_bytes = msg.as_builder().to_bytes()
can.send(msg_bytes)
_, updated_old = parser_old.update(t, True)
_, updated_new = parser_new.update(t, True)
updated_string = parser_string.update_string(msg_bytes)
if updated_old != updated_new:
route_ok = False
print(t, "Diff in seen")
if updated_new != updated_string:
route_ok = False
print(t, "Diff in seen string")
if dicts_vals_differ(parser_old.vl, parser_new.vl):
print(t, "Diff in dict")
route_ok = False
if dicts_vals_differ(parser_new.vl, parser_string.vl):
print(t, "Diff in dict string")
route_ok = False
return route_ok
class TestCanParser(unittest.TestCase):
def setUp(self):
self.routes = {
CAR.CIVIC: "b0c9d2329ad1606b|2019-05-30--20-23-57"
}
for route in self.routes.values():
route_filename = route + ".bz2"
if not os.path.isfile(route_filename):
with open(route + ".bz2", "w") as f:
f.write(requests.get(BASE_URL + route_filename).content)
def test_parser_civic(self):
self.assertTrue(run_route(self.routes[CAR.CIVIC]))
if __name__ == "__main__":
unittest.main()