#!/usr/bin/env python3
# Loopback test between black panda (+ harness and power) and white/grey panda
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test.
# To be sure, the test should be run with both harness orientations
import os
import time
import random
import argparse
from opendbc . car . structs import CarParams
from panda import Panda
def get_test_string ( ) :
return b " test " + os . urandom ( 10 )
counter = 0
nonzero_bus_errors = 0
zero_bus_errors = 0
content_errors = 0
def run_test ( sleep_duration ) :
global counter
pandas = Panda . list ( )
print ( pandas )
# make sure two pandas are connected
if len ( pandas ) != 2 :
raise Exception ( " Connect white/grey and black panda to run this test! " )
# connect
pandas [ 0 ] = Panda ( pandas [ 0 ] )
pandas [ 1 ] = Panda ( pandas [ 1 ] )
black_panda = None
other_panda = None
# find out which one is black
if pandas [ 0 ] . is_black ( ) and not pandas [ 1 ] . is_black ( ) :
black_panda = pandas [ 0 ]
other_panda = pandas [ 1 ]
elif not pandas [ 0 ] . is_black ( ) and pandas [ 1 ] . is_black ( ) :
black_panda = pandas [ 1 ]
other_panda = pandas [ 0 ]
else :
raise Exception ( " Connect white/grey and black panda to run this test! " )
# disable safety modes
black_panda . set_safety_mode ( CarParams . SafetyModel . allOutput )
other_panda . set_safety_mode ( CarParams . SafetyModel . allOutput )
# test health packet
print ( " black panda health " , black_panda . health ( ) )
print ( " other panda health " , other_panda . health ( ) )
# test black -> other
start_time = time . time ( )
temp_start_time = start_time
while True :
test_buses ( black_panda , other_panda , True , [ ( 0 , False , [ 0 ] ) , ( 1 , False , [ 1 ] ) , ( 2 , False , [ 2 ] ) , ( 1 , True , [ 0 ] ) ] , sleep_duration )
test_buses ( black_panda , other_panda , False , [ ( 0 , False , [ 0 ] ) , ( 1 , False , [ 1 ] ) , ( 2 , False , [ 2 ] ) , ( 0 , True , [ 0 , 1 ] ) ] , sleep_duration )
counter + = 1
runtime = time . time ( ) - start_time
print ( " Number of cycles: " , counter , " Non-zero bus errors: " , nonzero_bus_errors , " Zero bus errors: " , zero_bus_errors ,
" Content errors: " , content_errors , " Runtime: " , runtime )
if ( time . time ( ) - temp_start_time ) > 3600 * 6 :
# Toggle relay
black_panda . set_safety_mode ( CarParams . SafetyModel . silent )
time . sleep ( 1 )
black_panda . set_safety_mode ( CarParams . SafetyModel . allOutput )
time . sleep ( 1 )
temp_start_time = time . time ( )
def test_buses ( black_panda , other_panda , direction , test_array , sleep_duration ) :
global nonzero_bus_errors , zero_bus_errors , content_errors
if direction :
print ( " ***************** TESTING (BLACK --> OTHER) ***************** " )
else :
print ( " ***************** TESTING (OTHER --> BLACK) ***************** " )
for send_bus , obd , recv_buses in test_array :
black_panda . send_heartbeat ( )
other_panda . send_heartbeat ( )
print ( " \n test can: " , send_bus , " OBD: " , obd )
# set OBD on black panda
black_panda . set_obd ( True if obd else None )
# clear and flush
if direction :
black_panda . can_clear ( send_bus )
else :
other_panda . can_clear ( send_bus )
for recv_bus in recv_buses :
if direction :
other_panda . can_clear ( recv_bus )
else :
black_panda . can_clear ( recv_bus )
black_panda . can_recv ( )
other_panda . can_recv ( )
# send the characters
at = random . randint ( 1 , 2000 )
st = get_test_string ( ) [ 0 : 8 ]
if direction :
black_panda . can_send ( at , st , send_bus )
else :
other_panda . can_send ( at , st , send_bus )
time . sleep ( 0.1 )
# check for receive
if direction :
_ = black_panda . can_recv ( ) # cans echo
cans_loop = other_panda . can_recv ( )
else :
_ = other_panda . can_recv ( ) # cans echo
cans_loop = black_panda . can_recv ( )
loop_buses = [ ]
for loop in cans_loop :
if ( loop [ 0 ] != at ) or ( loop [ 1 ] != st ) :
content_errors + = 1
print ( " Loop on bus " , str ( loop [ 2 ] ) )
loop_buses . append ( loop [ 2 ] )
if len ( cans_loop ) == 0 :
print ( " No loop " )
assert os . getenv ( " NOASSERT " )
# test loop buses
recv_buses . sort ( )
loop_buses . sort ( )
if ( recv_buses != loop_buses ) :
if len ( loop_buses ) == 0 :
zero_bus_errors + = 1
else :
nonzero_bus_errors + = 1
assert os . getenv ( " NOASSERT " )
else :
print ( " TEST PASSED " )
time . sleep ( sleep_duration )
print ( " \n " )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( )
parser . add_argument ( " -n " , type = int , help = " Number of test iterations to run " )
parser . add_argument ( " -sleep " , type = int , help = " Sleep time between tests " , default = 0 )
args = parser . parse_args ( )
if args . n is None :
while True :
run_test ( sleep_duration = args . sleep )
else :
for _ in range ( args . n ) :
run_test ( sleep_duration = args . sleep )