openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

153 lines
4.9 KiB

#!/usr/bin/env python3
import os
import time
import random
from collections import defaultdict
from opendbc.car.structs import CarParams
from panda import Panda, calculate_checksum, DLC_TO_LEN
from panda import PandaJungle
from panda.tests.hitl.helpers import time_many_sends
H7_HW_TYPES = [Panda.HW_TYPE_RED_PANDA, Panda.HW_TYPE_RED_PANDA_V2]
JUNGLE_SERIAL = os.getenv("JUNGLE")
H7_PANDAS_EXCLUDE = [] # type: ignore
if os.getenv("H7_PANDAS_EXCLUDE"):
H7_PANDAS_EXCLUDE = os.getenv("H7_PANDAS_EXCLUDE").strip().split(" ") # type: ignore
def panda_reset():
panda_serials = []
panda_jungle = PandaJungle(JUNGLE_SERIAL)
panda_jungle.set_can_silent(True)
panda_jungle.set_panda_power(False)
time.sleep(1)
panda_jungle.set_panda_power(True)
time.sleep(4)
for serial in Panda.list():
if serial not in H7_PANDAS_EXCLUDE:
with Panda(serial=serial) as p:
if p.get_type() in H7_HW_TYPES:
p.reset()
panda_serials.append(serial)
print("test pandas", panda_serials)
assert len(panda_serials) == 2, "Two H7 pandas required"
return panda_serials
def panda_init(serial, enable_canfd=False, enable_non_iso=False):
p = Panda(serial=serial)
p.set_power_save(False)
for bus in range(3):
p.set_can_speed_kbps(0, 500)
if enable_canfd:
p.set_can_data_speed_kbps(bus, 2000)
if enable_non_iso:
p.set_canfd_non_iso(bus, True)
p.set_safety_mode(CarParams.SafetyModel.allOutput)
return p
def test_canfd_throughput(p, p_recv=None):
two_pandas = p_recv is not None
p.set_safety_mode(CarParams.SafetyModel.allOutput)
if two_pandas:
p_recv.set_safety_mode(CarParams.SafetyModel.allOutput)
# enable output mode
else:
p.set_can_loopback(True)
tests = [
[500, 1000, 2000], # speeds
[93, 87, 78], # saturation thresholds
]
for i in range(len(tests[0])):
# set bus 0 data speed to speed
p.set_can_data_speed_kbps(0, tests[0][i])
if p_recv is not None:
p_recv.set_can_data_speed_kbps(0, tests[0][i])
time.sleep(0.05)
comp_kbps = time_many_sends(p, 0, p_recv=p_recv, msg_count=400, two_pandas=two_pandas, msg_len=64)
# bit count from https://en.wikipedia.org/wiki/CAN_bus
saturation_pct = (comp_kbps / tests[0][i]) * 100.0
assert saturation_pct > tests[1][i]
assert saturation_pct < 100
def canfd_test(p_send, p_recv):
for n in range(100):
sent_msgs = defaultdict(set)
to_send = []
for _ in range(200):
bus = random.randrange(3)
for dlc in range(len(DLC_TO_LEN)):
address = random.randrange(1, 1<<29)
data = bytearray(random.getrandbits(8) for _ in range(DLC_TO_LEN[dlc]))
if len(data) >= 2:
data[0] = calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8"))
to_send.append([address, data, bus])
sent_msgs[bus].add((address, bytes(data)))
p_send.can_send_many(to_send, timeout=0)
start_time = time.monotonic()
while (time.monotonic() - start_time < 1) and any(len(x) > 0 for x in sent_msgs.values()):
incoming = p_recv.can_recv()
for msg in incoming:
address, data, bus = msg
if len(data) >= 2:
assert calculate_checksum(data[1:] + bytes(str(address), encoding="utf-8")) == data[0]
k = (address, bytes(data))
assert k in sent_msgs[bus], f"message {k} was never sent on bus {bus}"
sent_msgs[bus].discard(k)
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {n}: bus {bus} missing {len(sent_msgs[bus])} messages"
def setup_test(enable_non_iso=False):
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=False, enable_non_iso=enable_non_iso)
p_recv = panda_init(panda_serials[1], enable_canfd=True, enable_non_iso=enable_non_iso)
# Check that sending panda CAN FD and BRS are turned off
for bus in range(3):
health = p_send.can_health(bus)
assert not health["canfd_enabled"]
assert not health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
# Receiving panda sends dummy CAN FD message that should enable CAN FD on sender side
for bus in range(3):
p_recv.can_send(0x200, b"dummymessage", bus)
p_recv.can_recv()
p_send.can_recv()
# Check if all tested buses on sending panda have swithed to CAN FD with BRS
for bus in range(3):
health = p_send.can_health(bus)
assert health["canfd_enabled"]
assert health["brs_enabled"]
assert health["canfd_non_iso"] == enable_non_iso
return p_send, p_recv
def main():
print("[TEST CAN-FD]")
p_send, p_recv = setup_test()
canfd_test(p_send, p_recv)
print("[TEST CAN-FD non-ISO]")
p_send, p_recv = setup_test(enable_non_iso=True)
canfd_test(p_send, p_recv)
print("[TEST CAN-FD THROUGHPUT]")
panda_serials = panda_reset()
p_send = panda_init(panda_serials[0], enable_canfd=True)
p_recv = panda_init(panda_serials[1], enable_canfd=True)
test_canfd_throughput(p_send, p_recv)
if __name__ == "__main__":
main()