9881e6118 Panda for Mazda (#165)
9a15d2f5b fix version newline
a8ed7d219 add subaru outback/legacy to subaru safety (#259)
bdeb1c953 mazda is #12
14ea4d2e0 merge safety gm in a single file
bf1ef875e Add GM passive safety mode (#266)
c131fffae fix canflash for pedal (#267)
3397b1527 only allow bootloader entry on debug builds
d68356b92 Honda Nidec: fwd stock AEB (#257)
6f532c6d5 Black panda Jenkins (#256)
d68508c79 Gpio race condition fix (#263)
d69d05fc0 Fixed pedal not initializing (#262)
36067e01c Honda safety: fixed incorrect brake decoding. Due to the specific limit of 255, this change does not affect the safety behavior
git-subtree-dir: panda
git-subtree-split: 9881e61184ad0417e9e080767f09585a9c777621
old-commit-hash: 876256a268
commatwo_master
parent
229b51e5d0
commit
4efb208892
38 changed files with 1310 additions and 445 deletions
@ -0,0 +1,169 @@ |
||||
|
||||
// CAN msgs we care about
|
||||
#define MAZDA_LKAS 0x243 |
||||
#define MAZDA_LANEINFO 0x440 |
||||
#define MAZDA_CRZ_CTRL 0x21c |
||||
#define MAZDA_WHEEL_SPEED 0x215 |
||||
#define MAZDA_STEER_TORQUE 0x240 |
||||
|
||||
// CAN bus numbers
|
||||
#define MAZDA_MAIN 0 |
||||
#define MAZDA_AUX 1 |
||||
#define MAZDA_CAM 2 |
||||
|
||||
#define MAZDA_MAX_STEER 2048 |
||||
|
||||
// max delta torque allowed for real time checks
|
||||
#define MAZDA_MAX_RT_DELTA 940 |
||||
// 250ms between real time checks
|
||||
#define MAZDA_RT_INTERVAL 250000 |
||||
#define MAZDA_MAX_RATE_UP 10 |
||||
#define MAZDA_MAX_RATE_DOWN 25 |
||||
#define MAZDA_DRIVER_TORQUE_ALLOWANCE 15 |
||||
#define MAZDA_DRIVER_TORQUE_FACTOR 1 |
||||
|
||||
|
||||
int mazda_cruise_engaged_last = 0; |
||||
int mazda_rt_torque_last = 0; |
||||
int mazda_desired_torque_last = 0; |
||||
uint32_t mazda_ts_last = 0; |
||||
struct sample_t mazda_torque_driver; // last few driver torques measured
|
||||
|
||||
// track msgs coming from OP so that we know what CAM msgs to drop and what to forward
|
||||
int mazda_op_lkas_detected = 0; |
||||
int mazda_op_laneinfo_detected = 0; |
||||
|
||||
int mazda_forward_cam = 0; |
||||
int mazda_giraffe_switch_2_on = 0; |
||||
|
||||
void mazda_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) { |
||||
int bus = GET_BUS(to_push); |
||||
int addr = GET_ADDR(to_push); |
||||
|
||||
if ((addr == MAZDA_STEER_TORQUE) && (bus == MAZDA_MAIN)) { |
||||
int torque_driver_new = GET_BYTE(to_push, 0) - 127; |
||||
// update array of samples
|
||||
update_sample(&mazda_torque_driver, torque_driver_new); |
||||
} |
||||
|
||||
// enter controls on rising edge of ACC, exit controls on ACC off
|
||||
if ((addr == MAZDA_CRZ_CTRL) && (bus == MAZDA_MAIN)) { |
||||
int cruise_engaged = GET_BYTE(to_push, 0) & 8; |
||||
if (cruise_engaged != 0) { |
||||
if (!mazda_cruise_engaged_last) { |
||||
controls_allowed = 1; |
||||
} |
||||
} |
||||
else { |
||||
controls_allowed = 0; |
||||
} |
||||
mazda_cruise_engaged_last = cruise_engaged; |
||||
} |
||||
|
||||
// we have msgs on bus MAZDA_CAM
|
||||
if (bus == MAZDA_CAM) { |
||||
// the stock CAM is connected
|
||||
if (addr == MAZDA_LKAS) { |
||||
mazda_forward_cam = 1; |
||||
} |
||||
// if we see wheel speed msgs on MAZDA_CAM bus then giraffe switch 2 is high
|
||||
// (hardware passthru)
|
||||
if (addr == MAZDA_WHEEL_SPEED) { |
||||
mazda_giraffe_switch_2_on = 1; |
||||
} |
||||
} |
||||
} |
||||
|
||||
static int mazda_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) { |
||||
int tx = 1; |
||||
int addr = GET_ADDR(to_send); |
||||
int bus = GET_BUS(to_send); |
||||
|
||||
// Check if msg is sent on the main BUS
|
||||
if (bus == MAZDA_MAIN) { |
||||
if ((addr == MAZDA_LKAS) && !mazda_op_lkas_detected){ |
||||
mazda_op_lkas_detected = 1; |
||||
} |
||||
if ((addr == MAZDA_LANEINFO) && !mazda_op_laneinfo_detected){ |
||||
mazda_op_laneinfo_detected = 1; |
||||
} |
||||
|
||||
// steer cmd checks
|
||||
if (addr == MAZDA_LKAS) { |
||||
int desired_torque = (((GET_BYTE(to_send, 0) & 0x0f) << 8) | GET_BYTE(to_send, 1)) - MAZDA_MAX_STEER; |
||||
bool violation = 0; |
||||
uint32_t ts = TIM2->CNT; |
||||
|
||||
if (controls_allowed) { |
||||
|
||||
// *** global torque limit check ***
|
||||
violation |= max_limit_check(desired_torque, MAZDA_MAX_STEER, -MAZDA_MAX_STEER); |
||||
|
||||
// *** torque rate limit check ***
|
||||
int desired_torque_last = mazda_desired_torque_last; |
||||
violation |= driver_limit_check(desired_torque, desired_torque_last, &mazda_torque_driver, |
||||
MAZDA_MAX_STEER, MAZDA_MAX_RATE_UP, MAZDA_MAX_RATE_DOWN, |
||||
MAZDA_DRIVER_TORQUE_ALLOWANCE, MAZDA_DRIVER_TORQUE_FACTOR); |
||||
// used next time
|
||||
mazda_desired_torque_last = desired_torque; |
||||
|
||||
// *** torque real time rate limit check ***
|
||||
violation |= rt_rate_limit_check(desired_torque, mazda_rt_torque_last, MAZDA_MAX_RT_DELTA); |
||||
|
||||
// every RT_INTERVAL set the new limits
|
||||
uint32_t ts_elapsed = get_ts_elapsed(ts, mazda_ts_last); |
||||
if (ts_elapsed > ((uint32_t) MAZDA_RT_INTERVAL)) { |
||||
mazda_rt_torque_last = desired_torque; |
||||
mazda_ts_last = ts; |
||||
} |
||||
} |
||||
|
||||
// no torque if controls is not allowed
|
||||
if (!controls_allowed && (desired_torque != 0)) { |
||||
violation = 1; |
||||
} |
||||
|
||||
// reset to 0 if either controls is not allowed or there's a violation
|
||||
if (violation || !controls_allowed) { |
||||
mazda_desired_torque_last = 0; |
||||
mazda_rt_torque_last = 0; |
||||
mazda_ts_last = ts; |
||||
} |
||||
|
||||
if (violation) { |
||||
tx = 0; |
||||
} |
||||
} |
||||
} |
||||
return tx; |
||||
} |
||||
|
||||
static int mazda_fwd_hook(int bus, CAN_FIFOMailBox_TypeDef *to_fwd) { |
||||
int bus_fwd = -1; |
||||
if (mazda_forward_cam && !mazda_giraffe_switch_2_on) { |
||||
int addr = GET_ADDR(to_fwd); |
||||
if (bus == MAZDA_MAIN) { |
||||
bus_fwd = MAZDA_CAM; |
||||
} |
||||
else if (bus == MAZDA_CAM) { |
||||
// drop stock CAM_LKAS and CAM_LANEINFI if OP is sending them
|
||||
if (!((addr == MAZDA_LKAS) && mazda_op_lkas_detected) && |
||||
!((addr == MAZDA_LANEINFO) && mazda_op_laneinfo_detected)) { |
||||
bus_fwd = MAZDA_MAIN; |
||||
} |
||||
} |
||||
else { |
||||
bus_fwd = -1; |
||||
} |
||||
} |
||||
return bus_fwd; |
||||
} |
||||
|
||||
const safety_hooks mazda_hooks = { |
||||
.init = nooutput_init, |
||||
.rx = mazda_rx_hook, |
||||
.tx = mazda_tx_hook, |
||||
.tx_lin = nooutput_tx_lin_hook, |
||||
.ignition = default_ign_hook, |
||||
.fwd = mazda_fwd_hook, |
||||
}; |
@ -1,15 +1,13 @@ |
||||
import os |
||||
from panda import Panda |
||||
from helpers import panda_color_to_serial, test_white_and_grey |
||||
from helpers import panda_type_to_serial, test_white_and_grey, test_all_pandas, panda_connect_and_init |
||||
|
||||
@test_white_and_grey |
||||
@panda_color_to_serial |
||||
def test_recover(serial=None): |
||||
p = Panda(serial=serial) |
||||
@test_all_pandas |
||||
@panda_connect_and_init |
||||
def test_recover(p): |
||||
assert p.recover(timeout=30) |
||||
|
||||
@test_white_and_grey |
||||
@panda_color_to_serial |
||||
def test_flash(serial=None): |
||||
p = Panda(serial=serial) |
||||
@test_all_pandas |
||||
@panda_connect_and_init |
||||
def test_flash(p): |
||||
p.flash() |
||||
|
@ -0,0 +1,169 @@ |
||||
#!/usr/bin/env python |
||||
|
||||
# Loopback test between black panda (+ harness and power) and white/grey panda |
||||
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. |
||||
# To be sure, the test should be run with both harness orientations |
||||
|
||||
from __future__ import print_function |
||||
import os |
||||
import sys |
||||
import time |
||||
import random |
||||
import argparse |
||||
|
||||
from hexdump import hexdump |
||||
from itertools import permutations |
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) |
||||
from panda import Panda |
||||
|
||||
def get_test_string(): |
||||
return b"test"+os.urandom(10) |
||||
|
||||
counter = 0 |
||||
nonzero_bus_errors = 0 |
||||
zero_bus_errors = 0 |
||||
content_errors = 0 |
||||
|
||||
def run_test(sleep_duration): |
||||
global counter, nonzero_bus_errors, zero_bus_errors, content_errors |
||||
|
||||
pandas = Panda.list() |
||||
print(pandas) |
||||
|
||||
# make sure two pandas are connected |
||||
if len(pandas) != 2: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# connect |
||||
pandas[0] = Panda(pandas[0]) |
||||
pandas[1] = Panda(pandas[1]) |
||||
|
||||
# find out which one is black |
||||
type0 = pandas[0].get_type() |
||||
type1 = pandas[1].get_type() |
||||
|
||||
black_panda = None |
||||
other_panda = None |
||||
|
||||
if type0 == "\x03" and type1 != "\x03": |
||||
black_panda = pandas[0] |
||||
other_panda = pandas[1] |
||||
elif type0 != "\x03" and type1 == "\x03": |
||||
black_panda = pandas[1] |
||||
other_panda = pandas[0] |
||||
else: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# disable safety modes |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
|
||||
# test health packet |
||||
print("black panda health", black_panda.health()) |
||||
print("other panda health", other_panda.health()) |
||||
|
||||
# test black -> other |
||||
while True: |
||||
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) |
||||
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) |
||||
counter += 1 |
||||
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors) |
||||
|
||||
# Toggle relay |
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) |
||||
time.sleep(1) |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
time.sleep(1) |
||||
|
||||
|
||||
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): |
||||
global nonzero_bus_errors, zero_bus_errors, content_errors |
||||
|
||||
if direction: |
||||
print("***************** TESTING (BLACK --> OTHER) *****************") |
||||
else: |
||||
print("***************** TESTING (OTHER --> BLACK) *****************") |
||||
|
||||
for send_bus, obd, recv_buses in test_array: |
||||
black_panda.send_heartbeat() |
||||
other_panda.send_heartbeat() |
||||
print("\ntest can: ", send_bus, " OBD: ", obd) |
||||
|
||||
# set OBD on black panda |
||||
black_panda.set_gmlan(True if obd else None) |
||||
|
||||
# clear and flush |
||||
if direction: |
||||
black_panda.can_clear(send_bus) |
||||
else: |
||||
other_panda.can_clear(send_bus) |
||||
|
||||
for recv_bus in recv_buses: |
||||
if direction: |
||||
other_panda.can_clear(recv_bus) |
||||
else: |
||||
black_panda.can_clear(recv_bus) |
||||
|
||||
black_panda.can_recv() |
||||
other_panda.can_recv() |
||||
|
||||
# send the characters |
||||
at = random.randint(1, 2000) |
||||
st = get_test_string()[0:8] |
||||
if direction: |
||||
black_panda.can_send(at, st, send_bus) |
||||
else: |
||||
other_panda.can_send(at, st, send_bus) |
||||
time.sleep(0.1) |
||||
|
||||
# check for receive |
||||
if direction: |
||||
cans_echo = black_panda.can_recv() |
||||
cans_loop = other_panda.can_recv() |
||||
else: |
||||
cans_echo = other_panda.can_recv() |
||||
cans_loop = black_panda.can_recv() |
||||
|
||||
loop_buses = [] |
||||
for loop in cans_loop: |
||||
if (loop[0] != at) or (loop[2] != st): |
||||
content_errors += 1 |
||||
|
||||
print(" Loop on bus", str(loop[3])) |
||||
loop_buses.append(loop[3]) |
||||
if len(cans_loop) == 0: |
||||
print(" No loop") |
||||
if not os.getenv("NOASSERT"): |
||||
assert False |
||||
|
||||
# test loop buses |
||||
recv_buses.sort() |
||||
loop_buses.sort() |
||||
if(recv_buses != loop_buses): |
||||
if len(loop_buses) == 0: |
||||
zero_bus_errors += 1 |
||||
else: |
||||
nonzero_bus_errors += 1 |
||||
if not os.getenv("NOASSERT"): |
||||
assert False |
||||
else: |
||||
print(" TEST PASSED") |
||||
|
||||
time.sleep(sleep_duration) |
||||
print("\n") |
||||
|
||||
if __name__ == "__main__": |
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument("-n", type=int, help="Number of test iterations to run") |
||||
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) |
||||
args = parser.parse_args() |
||||
|
||||
if args.n is None: |
||||
while True: |
||||
run_test(sleep_duration=args.sleep) |
||||
else: |
||||
for i in range(args.n): |
||||
run_test(sleep_duration=args.sleep) |
@ -0,0 +1,175 @@ |
||||
#!/usr/bin/env python |
||||
|
||||
# Loopback test between black panda (+ harness and power) and white/grey panda |
||||
# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. |
||||
# To be sure, the test should be run with both harness orientations |
||||
|
||||
from __future__ import print_function |
||||
import os |
||||
import sys |
||||
import time |
||||
import random |
||||
import argparse |
||||
|
||||
from hexdump import hexdump |
||||
from itertools import permutations |
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) |
||||
from panda import Panda |
||||
|
||||
def get_test_string(): |
||||
return b"test"+os.urandom(10) |
||||
|
||||
counter = 0 |
||||
nonzero_bus_errors = 0 |
||||
zero_bus_errors = 0 |
||||
content_errors = 0 |
||||
|
||||
def run_test(sleep_duration): |
||||
global counter, nonzero_bus_errors, zero_bus_errors, content_errors |
||||
|
||||
pandas = Panda.list() |
||||
print(pandas) |
||||
|
||||
# make sure two pandas are connected |
||||
if len(pandas) != 2: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# connect |
||||
pandas[0] = Panda(pandas[0]) |
||||
pandas[1] = Panda(pandas[1]) |
||||
|
||||
# find out which one is black |
||||
type0 = pandas[0].get_type() |
||||
type1 = pandas[1].get_type() |
||||
|
||||
black_panda = None |
||||
other_panda = None |
||||
|
||||
if type0 == "\x03" and type1 != "\x03": |
||||
black_panda = pandas[0] |
||||
other_panda = pandas[1] |
||||
elif type0 != "\x03" and type1 == "\x03": |
||||
black_panda = pandas[1] |
||||
other_panda = pandas[0] |
||||
else: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# disable safety modes |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
|
||||
# test health packet |
||||
print("black panda health", black_panda.health()) |
||||
print("other panda health", other_panda.health()) |
||||
|
||||
# test black -> other |
||||
start_time = time.time() |
||||
temp_start_time = start_time |
||||
while True: |
||||
test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) |
||||
test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) |
||||
counter += 1 |
||||
|
||||
runtime = time.time() - start_time |
||||
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime) |
||||
|
||||
if (time.time() - temp_start_time) > 3600*6: |
||||
# Toggle relay |
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) |
||||
time.sleep(1) |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
time.sleep(1) |
||||
temp_start_time = time.time() |
||||
|
||||
|
||||
def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): |
||||
global nonzero_bus_errors, zero_bus_errors, content_errors |
||||
|
||||
if direction: |
||||
print("***************** TESTING (BLACK --> OTHER) *****************") |
||||
else: |
||||
print("***************** TESTING (OTHER --> BLACK) *****************") |
||||
|
||||
for send_bus, obd, recv_buses in test_array: |
||||
black_panda.send_heartbeat() |
||||
other_panda.send_heartbeat() |
||||
print("\ntest can: ", send_bus, " OBD: ", obd) |
||||
|
||||
# set OBD on black panda |
||||
black_panda.set_gmlan(True if obd else None) |
||||
|
||||
# clear and flush |
||||
if direction: |
||||
black_panda.can_clear(send_bus) |
||||
else: |
||||
other_panda.can_clear(send_bus) |
||||
|
||||
for recv_bus in recv_buses: |
||||
if direction: |
||||
other_panda.can_clear(recv_bus) |
||||
else: |
||||
black_panda.can_clear(recv_bus) |
||||
|
||||
black_panda.can_recv() |
||||
other_panda.can_recv() |
||||
|
||||
# send the characters |
||||
at = random.randint(1, 2000) |
||||
st = get_test_string()[0:8] |
||||
if direction: |
||||
black_panda.can_send(at, st, send_bus) |
||||
else: |
||||
other_panda.can_send(at, st, send_bus) |
||||
time.sleep(0.1) |
||||
|
||||
# check for receive |
||||
if direction: |
||||
cans_echo = black_panda.can_recv() |
||||
cans_loop = other_panda.can_recv() |
||||
else: |
||||
cans_echo = other_panda.can_recv() |
||||
cans_loop = black_panda.can_recv() |
||||
|
||||
loop_buses = [] |
||||
for loop in cans_loop: |
||||
if (loop[0] != at) or (loop[2] != st): |
||||
content_errors += 1 |
||||
|
||||
print(" Loop on bus", str(loop[3])) |
||||
loop_buses.append(loop[3]) |
||||
if len(cans_loop) == 0: |
||||
print(" No loop") |
||||
if not os.getenv("NOASSERT"): |
||||
assert False |
||||
|
||||
# test loop buses |
||||
recv_buses.sort() |
||||
loop_buses.sort() |
||||
if(recv_buses != loop_buses): |
||||
if len(loop_buses) == 0: |
||||
zero_bus_errors += 1 |
||||
else: |
||||
nonzero_bus_errors += 1 |
||||
if not os.getenv("NOASSERT"): |
||||
assert False |
||||
else: |
||||
print(" TEST PASSED") |
||||
|
||||
time.sleep(sleep_duration) |
||||
print("\n") |
||||
|
||||
if __name__ == "__main__": |
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument("-n", type=int, help="Number of test iterations to run") |
||||
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) |
||||
args = parser.parse_args() |
||||
|
||||
if args.n is None: |
||||
while True: |
||||
run_test(sleep_duration=args.sleep) |
||||
else: |
||||
for i in range(args.n): |
||||
run_test(sleep_duration=args.sleep) |
@ -0,0 +1,145 @@ |
||||
#!/usr/bin/env python |
||||
|
||||
# Relay test with loopback between black panda (+ harness and power) and white/grey panda |
||||
# Tests the relay switching multiple times / second by looking at the buses on which loop occurs. |
||||
|
||||
from __future__ import print_function |
||||
import os |
||||
import sys |
||||
import time |
||||
import random |
||||
import argparse |
||||
|
||||
from hexdump import hexdump |
||||
from itertools import permutations |
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) |
||||
from panda import Panda |
||||
|
||||
def get_test_string(): |
||||
return b"test"+os.urandom(10) |
||||
|
||||
counter = 0 |
||||
open_errors = 0 |
||||
closed_errors = 0 |
||||
content_errors = 0 |
||||
|
||||
def run_test(sleep_duration): |
||||
global counter, open_errors, closed_errors, content_errors |
||||
|
||||
pandas = Panda.list() |
||||
#pandas = ["540046000c51363338383037", "07801b800f51363038363036"] |
||||
print(pandas) |
||||
|
||||
# make sure two pandas are connected |
||||
if len(pandas) != 2: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# connect |
||||
pandas[0] = Panda(pandas[0]) |
||||
pandas[1] = Panda(pandas[1]) |
||||
|
||||
# find out which one is black |
||||
type0 = pandas[0].get_type() |
||||
type1 = pandas[1].get_type() |
||||
|
||||
black_panda = None |
||||
other_panda = None |
||||
|
||||
if type0 == "\x03" and type1 != "\x03": |
||||
black_panda = pandas[0] |
||||
other_panda = pandas[1] |
||||
elif type0 != "\x03" and type1 == "\x03": |
||||
black_panda = pandas[1] |
||||
other_panda = pandas[0] |
||||
else: |
||||
print("Connect white/grey and black panda to run this test!") |
||||
assert False |
||||
|
||||
# disable safety modes |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
|
||||
# test health packet |
||||
print("black panda health", black_panda.health()) |
||||
print("other panda health", other_panda.health()) |
||||
|
||||
# test black -> other |
||||
while True: |
||||
# Switch on relay |
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||
time.sleep(0.05) |
||||
|
||||
if not test_buses(black_panda, other_panda, (0, False, [0])): |
||||
open_errors += 1 |
||||
print("Open error") |
||||
assert False |
||||
|
||||
# Switch off relay |
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) |
||||
time.sleep(0.05) |
||||
|
||||
if not test_buses(black_panda, other_panda, (0, False, [0, 2])): |
||||
closed_errors += 1 |
||||
print("Close error") |
||||
assert False |
||||
|
||||
counter += 1 |
||||
print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors) |
||||
|
||||
def test_buses(black_panda, other_panda, test_obj): |
||||
global content_errors |
||||
send_bus, obd, recv_buses = test_obj |
||||
|
||||
black_panda.send_heartbeat() |
||||
other_panda.send_heartbeat() |
||||
|
||||
# Set OBD on send panda |
||||
other_panda.set_gmlan(True if obd else None) |
||||
|
||||
# clear and flush |
||||
other_panda.can_clear(send_bus) |
||||
|
||||
for recv_bus in recv_buses: |
||||
black_panda.can_clear(recv_bus) |
||||
|
||||
black_panda.can_recv() |
||||
other_panda.can_recv() |
||||
|
||||
# send the characters |
||||
at = random.randint(1, 2000) |
||||
st = get_test_string()[0:8] |
||||
other_panda.can_send(at, st, send_bus) |
||||
time.sleep(0.05) |
||||
|
||||
# check for receive |
||||
cans_echo = other_panda.can_recv() |
||||
cans_loop = black_panda.can_recv() |
||||
|
||||
loop_buses = [] |
||||
for loop in cans_loop: |
||||
if (loop[0] != at) or (loop[2] != st): |
||||
content_errors += 1 |
||||
loop_buses.append(loop[3]) |
||||
|
||||
# test loop buses |
||||
recv_buses.sort() |
||||
loop_buses.sort() |
||||
if(recv_buses != loop_buses): |
||||
return False |
||||
else: |
||||
return True |
||||
|
||||
if __name__ == "__main__": |
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument("-n", type=int, help="Number of test iterations to run") |
||||
parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) |
||||
args = parser.parse_args() |
||||
|
||||
if args.n is None: |
||||
while True: |
||||
run_test(sleep_duration=args.sleep) |
||||
else: |
||||
for i in range(args.n): |
||||
run_test(sleep_duration=args.sleep) |
@ -0,0 +1,19 @@ |
||||
#!/usr/bin/env python |
||||
import time |
||||
from panda import Panda |
||||
|
||||
if __name__ == "__main__": |
||||
panda_serials = Panda.list() |
||||
pandas = [] |
||||
for ps in panda_serials: |
||||
pandas.append(Panda(serial=ps)) |
||||
if len(pandas) == 0: |
||||
print("No pandas connected") |
||||
assert False |
||||
|
||||
while True: |
||||
for panda in pandas: |
||||
print(panda.health()) |
||||
print("\n") |
||||
time.sleep(0.5) |
||||
|
Loading…
Reference in new issue