9881e6118 Panda for Mazda (#165) 9a15d2f5b fix version newline a8ed7d219 add subaru outback/legacy to subaru safety (#259) bdeb1c953 mazda is #12 14ea4d2e0 merge safety gm in a single file bf1ef875e Add GM passive safety mode (#266) c131fffae fix canflash for pedal (#267) 3397b1527 only allow bootloader entry on debug builds d68356b92 Honda Nidec: fwd stock AEB (#257) 6f532c6d5 Black panda Jenkins (#256) d68508c79 Gpio race condition fix (#263) d69d05fc0 Fixed pedal not initializing (#262) 36067e01c Honda safety: fixed incorrect brake decoding. Due to the specific limit of 255, this change does not affect the safety behavior git-subtree-dir: panda git-subtree-split: 9881e61184ad0417e9e080767f09585a9c777621pull/806/head
							parent
							
								
									9955b3c806
								
							
						
					
					
						commit
						876256a268
					
				
				 38 changed files with 1310 additions and 445 deletions
			
			
		| @ -0,0 +1,169 @@ | ||||
| 
 | ||||
| // CAN msgs we care about
 | ||||
| #define MAZDA_LKAS 0x243 | ||||
| #define MAZDA_LANEINFO 0x440 | ||||
| #define MAZDA_CRZ_CTRL 0x21c | ||||
| #define MAZDA_WHEEL_SPEED 0x215 | ||||
| #define MAZDA_STEER_TORQUE 0x240 | ||||
| 
 | ||||
| // CAN bus numbers
 | ||||
| #define MAZDA_MAIN 0 | ||||
| #define MAZDA_AUX 1 | ||||
| #define MAZDA_CAM 2 | ||||
| 
 | ||||
| #define MAZDA_MAX_STEER 2048 | ||||
| 
 | ||||
| // max delta torque allowed for real time checks
 | ||||
| #define MAZDA_MAX_RT_DELTA 940 | ||||
| // 250ms between real time checks
 | ||||
| #define MAZDA_RT_INTERVAL 250000 | ||||
| #define MAZDA_MAX_RATE_UP 10 | ||||
| #define MAZDA_MAX_RATE_DOWN 25 | ||||
| #define MAZDA_DRIVER_TORQUE_ALLOWANCE 15 | ||||
| #define MAZDA_DRIVER_TORQUE_FACTOR 1 | ||||
| 
 | ||||
| 
 | ||||
| int mazda_cruise_engaged_last = 0; | ||||
| int mazda_rt_torque_last = 0; | ||||
| int mazda_desired_torque_last = 0; | ||||
| uint32_t mazda_ts_last = 0; | ||||
| struct sample_t mazda_torque_driver;         // last few driver torques measured
 | ||||
| 
 | ||||
| // track msgs coming from OP so that we know what CAM msgs to drop and what to forward
 | ||||
| int mazda_op_lkas_detected = 0; | ||||
| int mazda_op_laneinfo_detected = 0; | ||||
| 
 | ||||
| int mazda_forward_cam = 0; | ||||
| int mazda_giraffe_switch_2_on = 0; | ||||
| 
 | ||||
| void mazda_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) { | ||||
|   int bus = GET_BUS(to_push); | ||||
|   int addr = GET_ADDR(to_push); | ||||
| 
 | ||||
|   if ((addr == MAZDA_STEER_TORQUE) && (bus == MAZDA_MAIN)) { | ||||
|     int torque_driver_new = GET_BYTE(to_push, 0) - 127; | ||||
|     // update array of samples
 | ||||
|     update_sample(&mazda_torque_driver, torque_driver_new); | ||||
|   } | ||||
| 
 | ||||
|   // enter controls on rising edge of ACC, exit controls on ACC off
 | ||||
|   if ((addr == MAZDA_CRZ_CTRL) && (bus == MAZDA_MAIN)) { | ||||
|     int cruise_engaged = GET_BYTE(to_push, 0) & 8; | ||||
|     if (cruise_engaged != 0) { | ||||
|       if (!mazda_cruise_engaged_last) { | ||||
| 	controls_allowed = 1; | ||||
|       } | ||||
|     } | ||||
|     else { | ||||
|       controls_allowed = 0; | ||||
|     } | ||||
|     mazda_cruise_engaged_last = cruise_engaged; | ||||
|   } | ||||
| 
 | ||||
|   // we have msgs on bus MAZDA_CAM
 | ||||
|   if (bus == MAZDA_CAM) { | ||||
|     // the stock CAM is connected
 | ||||
|     if (addr == MAZDA_LKAS) { | ||||
|       mazda_forward_cam = 1; | ||||
|     } | ||||
|     // if we see wheel speed msgs on MAZDA_CAM bus then giraffe switch 2 is high
 | ||||
|     // (hardware passthru)
 | ||||
|     if (addr == MAZDA_WHEEL_SPEED) { | ||||
|       mazda_giraffe_switch_2_on = 1; | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| static int mazda_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) { | ||||
|   int tx = 1; | ||||
|   int addr = GET_ADDR(to_send); | ||||
|   int bus = GET_BUS(to_send); | ||||
| 
 | ||||
|   // Check if msg is sent on the main BUS
 | ||||
|   if (bus == MAZDA_MAIN) { | ||||
|     if ((addr == MAZDA_LKAS) && !mazda_op_lkas_detected){ | ||||
|       mazda_op_lkas_detected = 1; | ||||
|     } | ||||
|     if ((addr == MAZDA_LANEINFO) && !mazda_op_laneinfo_detected){ | ||||
|       mazda_op_laneinfo_detected = 1; | ||||
|     } | ||||
| 
 | ||||
|     // steer cmd checks
 | ||||
|     if (addr == MAZDA_LKAS) { | ||||
|       int desired_torque = (((GET_BYTE(to_send, 0) & 0x0f) << 8) | GET_BYTE(to_send, 1)) - MAZDA_MAX_STEER; | ||||
|       bool violation = 0; | ||||
|       uint32_t ts = TIM2->CNT; | ||||
| 
 | ||||
|       if (controls_allowed) { | ||||
| 
 | ||||
| 	// *** global torque limit check ***
 | ||||
| 	violation |= max_limit_check(desired_torque, MAZDA_MAX_STEER, -MAZDA_MAX_STEER); | ||||
| 
 | ||||
| 	// *** torque rate limit check ***
 | ||||
| 	int desired_torque_last = mazda_desired_torque_last; | ||||
| 	violation |= driver_limit_check(desired_torque, desired_torque_last, &mazda_torque_driver, | ||||
| 					MAZDA_MAX_STEER, MAZDA_MAX_RATE_UP, MAZDA_MAX_RATE_DOWN, | ||||
| 					MAZDA_DRIVER_TORQUE_ALLOWANCE, MAZDA_DRIVER_TORQUE_FACTOR); | ||||
| 	// used next time
 | ||||
| 	mazda_desired_torque_last = desired_torque; | ||||
| 
 | ||||
| 	// *** torque real time rate limit check ***
 | ||||
| 	violation |= rt_rate_limit_check(desired_torque, mazda_rt_torque_last, MAZDA_MAX_RT_DELTA); | ||||
| 
 | ||||
| 	// every RT_INTERVAL set the new limits
 | ||||
| 	uint32_t ts_elapsed = get_ts_elapsed(ts, mazda_ts_last); | ||||
| 	if (ts_elapsed > ((uint32_t) MAZDA_RT_INTERVAL)) { | ||||
| 	  mazda_rt_torque_last = desired_torque; | ||||
| 	  mazda_ts_last = ts; | ||||
| 	} | ||||
|       } | ||||
| 
 | ||||
|       // no torque if controls is not allowed
 | ||||
|       if (!controls_allowed && (desired_torque != 0)) { | ||||
| 	violation = 1; | ||||
|       } | ||||
| 
 | ||||
|       // reset to 0 if either controls is not allowed or there's a violation
 | ||||
|       if (violation || !controls_allowed) { | ||||
| 	mazda_desired_torque_last = 0; | ||||
| 	mazda_rt_torque_last = 0; | ||||
| 	mazda_ts_last = ts; | ||||
|       } | ||||
| 
 | ||||
|       if (violation) { | ||||
| 	tx = 0; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   return tx; | ||||
| } | ||||
| 
 | ||||
| static int mazda_fwd_hook(int bus, CAN_FIFOMailBox_TypeDef *to_fwd) { | ||||
|   int bus_fwd = -1; | ||||
|   if (mazda_forward_cam && !mazda_giraffe_switch_2_on) { | ||||
|     int addr = GET_ADDR(to_fwd); | ||||
|     if (bus == MAZDA_MAIN) { | ||||
|       bus_fwd = MAZDA_CAM; | ||||
|     } | ||||
|     else if (bus == MAZDA_CAM) { | ||||
|       // drop stock CAM_LKAS and CAM_LANEINFI if OP is sending them
 | ||||
|       if (!((addr == MAZDA_LKAS) && mazda_op_lkas_detected) && | ||||
|           !((addr == MAZDA_LANEINFO) && mazda_op_laneinfo_detected)) { | ||||
| 	bus_fwd = MAZDA_MAIN; | ||||
|       } | ||||
|     } | ||||
|     else { | ||||
|       bus_fwd = -1; | ||||
|     } | ||||
|   } | ||||
|   return bus_fwd; | ||||
| } | ||||
| 
 | ||||
| const safety_hooks mazda_hooks = { | ||||
|   .init = nooutput_init, | ||||
|   .rx = mazda_rx_hook, | ||||
|   .tx = mazda_tx_hook, | ||||
|   .tx_lin = nooutput_tx_lin_hook, | ||||
|   .ignition = default_ign_hook, | ||||
|   .fwd = mazda_fwd_hook, | ||||
| }; | ||||
| @ -1,15 +1,13 @@ | ||||
| import os | ||||
| from panda import Panda | ||||
| from helpers import panda_color_to_serial, test_white_and_grey | ||||
| from helpers import panda_type_to_serial, test_white_and_grey, test_all_pandas, panda_connect_and_init | ||||
| 
 | ||||
| @test_white_and_grey | ||||
| @panda_color_to_serial | ||||
| def test_recover(serial=None): | ||||
|   p = Panda(serial=serial) | ||||
| @test_all_pandas | ||||
| @panda_connect_and_init | ||||
| def test_recover(p): | ||||
|   assert p.recover(timeout=30) | ||||
| 
 | ||||
| @test_white_and_grey | ||||
| @panda_color_to_serial | ||||
| def test_flash(serial=None): | ||||
|   p = Panda(serial=serial) | ||||
| @test_all_pandas | ||||
| @panda_connect_and_init | ||||
| def test_flash(p): | ||||
|   p.flash() | ||||
|  | ||||
| @ -0,0 +1,169 @@ | ||||
| #!/usr/bin/env python | ||||
| 
 | ||||
| # Loopback test between black panda (+ harness and power) and white/grey panda | ||||
| # Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. | ||||
| # To be sure, the test should be run with both harness orientations | ||||
| 
 | ||||
| from __future__ import print_function | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import random | ||||
| import argparse | ||||
| 
 | ||||
| from hexdump import hexdump | ||||
| from itertools import permutations | ||||
| 
 | ||||
| sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) | ||||
| from panda import Panda | ||||
| 
 | ||||
| def get_test_string(): | ||||
|   return b"test"+os.urandom(10) | ||||
| 
 | ||||
| counter = 0 | ||||
| nonzero_bus_errors = 0 | ||||
| zero_bus_errors = 0 | ||||
| content_errors = 0 | ||||
| 
 | ||||
| def run_test(sleep_duration): | ||||
|   global counter, nonzero_bus_errors, zero_bus_errors, content_errors | ||||
| 
 | ||||
|   pandas = Panda.list() | ||||
|   print(pandas) | ||||
| 
 | ||||
|   # make sure two pandas are connected | ||||
|   if len(pandas) != 2: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # connect | ||||
|   pandas[0] = Panda(pandas[0]) | ||||
|   pandas[1] = Panda(pandas[1]) | ||||
| 
 | ||||
|   # find out which one is black | ||||
|   type0 = pandas[0].get_type() | ||||
|   type1 = pandas[1].get_type() | ||||
| 
 | ||||
|   black_panda = None | ||||
|   other_panda = None | ||||
|    | ||||
|   if type0 == "\x03" and type1 != "\x03": | ||||
|     black_panda = pandas[0] | ||||
|     other_panda = pandas[1] | ||||
|   elif type0 != "\x03" and type1 == "\x03": | ||||
|     black_panda = pandas[1] | ||||
|     other_panda = pandas[0] | ||||
|   else: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # disable safety modes | ||||
|   black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|   other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
| 
 | ||||
|   # test health packet | ||||
|   print("black panda health", black_panda.health()) | ||||
|   print("other panda health", other_panda.health()) | ||||
| 
 | ||||
|   # test black -> other | ||||
|   while True: | ||||
|     test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) | ||||
|     test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) | ||||
|     counter += 1 | ||||
|     print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors) | ||||
|      | ||||
|     # Toggle relay | ||||
|     black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||||
|     time.sleep(1) | ||||
|     black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|     time.sleep(1) | ||||
| 	 | ||||
| 
 | ||||
| def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): | ||||
|   global nonzero_bus_errors, zero_bus_errors, content_errors | ||||
| 
 | ||||
|   if direction: | ||||
|     print("***************** TESTING (BLACK --> OTHER) *****************") | ||||
|   else: | ||||
|     print("***************** TESTING (OTHER --> BLACK) *****************") | ||||
| 
 | ||||
|   for send_bus, obd, recv_buses in test_array: | ||||
|     black_panda.send_heartbeat() | ||||
|     other_panda.send_heartbeat() | ||||
|     print("\ntest can: ", send_bus, " OBD: ", obd) | ||||
|      | ||||
|     # set OBD on black panda | ||||
|     black_panda.set_gmlan(True if obd else None) | ||||
| 
 | ||||
|     # clear and flush | ||||
|     if direction: | ||||
|       black_panda.can_clear(send_bus) | ||||
|     else: | ||||
|       other_panda.can_clear(send_bus) | ||||
| 
 | ||||
|     for recv_bus in recv_buses: | ||||
|       if direction: | ||||
|         other_panda.can_clear(recv_bus) | ||||
|       else: | ||||
| 	black_panda.can_clear(recv_bus) | ||||
|      | ||||
|     black_panda.can_recv() | ||||
|     other_panda.can_recv() | ||||
| 
 | ||||
|     # send the characters | ||||
|     at = random.randint(1, 2000) | ||||
|     st = get_test_string()[0:8] | ||||
|     if direction: | ||||
|       black_panda.can_send(at, st, send_bus) | ||||
|     else: | ||||
|       other_panda.can_send(at, st, send_bus) | ||||
|     time.sleep(0.1) | ||||
| 
 | ||||
|     # check for receive | ||||
|     if direction: | ||||
|       cans_echo = black_panda.can_recv() | ||||
|       cans_loop = other_panda.can_recv() | ||||
|     else: | ||||
|       cans_echo = other_panda.can_recv() | ||||
|       cans_loop = black_panda.can_recv() | ||||
| 
 | ||||
|     loop_buses = [] | ||||
|     for loop in cans_loop: | ||||
|       if (loop[0] != at) or (loop[2] != st): | ||||
| 	      content_errors += 1 | ||||
| 
 | ||||
|       print("  Loop on bus", str(loop[3])) | ||||
|       loop_buses.append(loop[3]) | ||||
|     if len(cans_loop) == 0: | ||||
|       print("  No loop") | ||||
|       if not os.getenv("NOASSERT"): | ||||
|         assert False | ||||
|      | ||||
|     # test loop buses | ||||
|     recv_buses.sort() | ||||
|     loop_buses.sort() | ||||
|     if(recv_buses != loop_buses): | ||||
|       if len(loop_buses) == 0: | ||||
| 	      zero_bus_errors += 1 | ||||
|       else: | ||||
| 	      nonzero_bus_errors += 1 | ||||
|       if not os.getenv("NOASSERT"): | ||||
|         assert False | ||||
|     else: | ||||
|       print("  TEST PASSED") | ||||
| 
 | ||||
|     time.sleep(sleep_duration) | ||||
|   print("\n") | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   parser = argparse.ArgumentParser() | ||||
|   parser.add_argument("-n", type=int, help="Number of test iterations to run") | ||||
|   parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) | ||||
|   args = parser.parse_args() | ||||
| 
 | ||||
|   if args.n is None: | ||||
|     while True: | ||||
|       run_test(sleep_duration=args.sleep) | ||||
|   else: | ||||
|     for i in range(args.n): | ||||
|       run_test(sleep_duration=args.sleep) | ||||
| @ -0,0 +1,175 @@ | ||||
| #!/usr/bin/env python | ||||
| 
 | ||||
| # Loopback test between black panda (+ harness and power) and white/grey panda | ||||
| # Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. | ||||
| # To be sure, the test should be run with both harness orientations | ||||
| 
 | ||||
| from __future__ import print_function | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import random | ||||
| import argparse | ||||
| 
 | ||||
| from hexdump import hexdump | ||||
| from itertools import permutations | ||||
| 
 | ||||
| sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) | ||||
| from panda import Panda | ||||
| 
 | ||||
| def get_test_string(): | ||||
|   return b"test"+os.urandom(10) | ||||
| 
 | ||||
| counter = 0 | ||||
| nonzero_bus_errors = 0 | ||||
| zero_bus_errors = 0 | ||||
| content_errors = 0 | ||||
| 
 | ||||
| def run_test(sleep_duration): | ||||
|   global counter, nonzero_bus_errors, zero_bus_errors, content_errors | ||||
| 
 | ||||
|   pandas = Panda.list() | ||||
|   print(pandas) | ||||
| 
 | ||||
|   # make sure two pandas are connected | ||||
|   if len(pandas) != 2: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # connect | ||||
|   pandas[0] = Panda(pandas[0]) | ||||
|   pandas[1] = Panda(pandas[1]) | ||||
| 
 | ||||
|   # find out which one is black | ||||
|   type0 = pandas[0].get_type() | ||||
|   type1 = pandas[1].get_type() | ||||
| 
 | ||||
|   black_panda = None | ||||
|   other_panda = None | ||||
|    | ||||
|   if type0 == "\x03" and type1 != "\x03": | ||||
|     black_panda = pandas[0] | ||||
|     other_panda = pandas[1] | ||||
|   elif type0 != "\x03" and type1 == "\x03": | ||||
|     black_panda = pandas[1] | ||||
|     other_panda = pandas[0] | ||||
|   else: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # disable safety modes | ||||
|   black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|   other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
| 
 | ||||
|   # test health packet | ||||
|   print("black panda health", black_panda.health()) | ||||
|   print("other panda health", other_panda.health()) | ||||
| 
 | ||||
|   # test black -> other | ||||
|   start_time = time.time() | ||||
|   temp_start_time = start_time | ||||
|   while True: | ||||
|     test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) | ||||
|     test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) | ||||
|     counter += 1 | ||||
|      | ||||
|     runtime = time.time() - start_time | ||||
|     print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime) | ||||
| 
 | ||||
|     if (time.time() - temp_start_time) > 3600*6: | ||||
|     	# Toggle relay | ||||
|     	black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||||
|     	time.sleep(1) | ||||
| 	black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|     	time.sleep(1) | ||||
| 	temp_start_time = time.time() | ||||
| 	 | ||||
| 
 | ||||
| def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): | ||||
|   global nonzero_bus_errors, zero_bus_errors, content_errors | ||||
| 
 | ||||
|   if direction: | ||||
|     print("***************** TESTING (BLACK --> OTHER) *****************") | ||||
|   else: | ||||
|     print("***************** TESTING (OTHER --> BLACK) *****************") | ||||
| 
 | ||||
|   for send_bus, obd, recv_buses in test_array: | ||||
|     black_panda.send_heartbeat() | ||||
|     other_panda.send_heartbeat() | ||||
|     print("\ntest can: ", send_bus, " OBD: ", obd) | ||||
|      | ||||
|     # set OBD on black panda | ||||
|     black_panda.set_gmlan(True if obd else None) | ||||
| 
 | ||||
|     # clear and flush | ||||
|     if direction: | ||||
|       black_panda.can_clear(send_bus) | ||||
|     else: | ||||
|       other_panda.can_clear(send_bus) | ||||
| 
 | ||||
|     for recv_bus in recv_buses: | ||||
|       if direction: | ||||
|         other_panda.can_clear(recv_bus) | ||||
|       else: | ||||
| 	black_panda.can_clear(recv_bus) | ||||
|      | ||||
|     black_panda.can_recv() | ||||
|     other_panda.can_recv() | ||||
| 
 | ||||
|     # send the characters | ||||
|     at = random.randint(1, 2000) | ||||
|     st = get_test_string()[0:8] | ||||
|     if direction: | ||||
|       black_panda.can_send(at, st, send_bus) | ||||
|     else: | ||||
|       other_panda.can_send(at, st, send_bus) | ||||
|     time.sleep(0.1) | ||||
| 
 | ||||
|     # check for receive | ||||
|     if direction: | ||||
|       cans_echo = black_panda.can_recv() | ||||
|       cans_loop = other_panda.can_recv() | ||||
|     else: | ||||
|       cans_echo = other_panda.can_recv() | ||||
|       cans_loop = black_panda.can_recv() | ||||
| 
 | ||||
|     loop_buses = [] | ||||
|     for loop in cans_loop: | ||||
|       if (loop[0] != at) or (loop[2] != st): | ||||
| 	      content_errors += 1 | ||||
| 
 | ||||
|       print("  Loop on bus", str(loop[3])) | ||||
|       loop_buses.append(loop[3]) | ||||
|     if len(cans_loop) == 0: | ||||
|       print("  No loop") | ||||
|       if not os.getenv("NOASSERT"): | ||||
|         assert False | ||||
|      | ||||
|     # test loop buses | ||||
|     recv_buses.sort() | ||||
|     loop_buses.sort() | ||||
|     if(recv_buses != loop_buses): | ||||
|       if len(loop_buses) == 0: | ||||
| 	      zero_bus_errors += 1 | ||||
|       else: | ||||
| 	      nonzero_bus_errors += 1 | ||||
|       if not os.getenv("NOASSERT"): | ||||
|         assert False | ||||
|     else: | ||||
|       print("  TEST PASSED") | ||||
| 
 | ||||
|     time.sleep(sleep_duration) | ||||
|   print("\n") | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   parser = argparse.ArgumentParser() | ||||
|   parser.add_argument("-n", type=int, help="Number of test iterations to run") | ||||
|   parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) | ||||
|   args = parser.parse_args() | ||||
| 
 | ||||
|   if args.n is None: | ||||
|     while True: | ||||
|       run_test(sleep_duration=args.sleep) | ||||
|   else: | ||||
|     for i in range(args.n): | ||||
|       run_test(sleep_duration=args.sleep) | ||||
| @ -0,0 +1,145 @@ | ||||
| #!/usr/bin/env python | ||||
| 
 | ||||
| # Relay test with loopback between black panda (+ harness and power) and white/grey panda | ||||
| # Tests the relay switching multiple times / second by looking at the buses on which loop occurs. | ||||
| 
 | ||||
| from __future__ import print_function | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import random | ||||
| import argparse | ||||
| 
 | ||||
| from hexdump import hexdump | ||||
| from itertools import permutations | ||||
| 
 | ||||
| sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) | ||||
| from panda import Panda | ||||
| 
 | ||||
| def get_test_string(): | ||||
|   return b"test"+os.urandom(10) | ||||
| 
 | ||||
| counter = 0 | ||||
| open_errors = 0 | ||||
| closed_errors = 0 | ||||
| content_errors = 0 | ||||
| 
 | ||||
| def run_test(sleep_duration): | ||||
|   global counter, open_errors, closed_errors, content_errors | ||||
| 
 | ||||
|   pandas = Panda.list() | ||||
|   #pandas = ["540046000c51363338383037", "07801b800f51363038363036"] | ||||
|   print(pandas) | ||||
| 
 | ||||
|   # make sure two pandas are connected | ||||
|   if len(pandas) != 2: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # connect | ||||
|   pandas[0] = Panda(pandas[0]) | ||||
|   pandas[1] = Panda(pandas[1]) | ||||
| 
 | ||||
|   # find out which one is black | ||||
|   type0 = pandas[0].get_type() | ||||
|   type1 = pandas[1].get_type() | ||||
| 
 | ||||
|   black_panda = None | ||||
|   other_panda = None | ||||
|    | ||||
|   if type0 == "\x03" and type1 != "\x03": | ||||
|     black_panda = pandas[0] | ||||
|     other_panda = pandas[1] | ||||
|   elif type0 != "\x03" and type1 == "\x03": | ||||
|     black_panda = pandas[1] | ||||
|     other_panda = pandas[0] | ||||
|   else: | ||||
|     print("Connect white/grey and black panda to run this test!") | ||||
|     assert False | ||||
| 
 | ||||
|   # disable safety modes | ||||
|   black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|   other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
| 
 | ||||
|   # test health packet | ||||
|   print("black panda health", black_panda.health()) | ||||
|   print("other panda health", other_panda.health()) | ||||
| 
 | ||||
|   # test black -> other | ||||
|   while True: | ||||
|     # Switch on relay | ||||
|     black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) | ||||
|     time.sleep(0.05) | ||||
| 
 | ||||
|     if not test_buses(black_panda, other_panda, (0, False, [0])): | ||||
|       open_errors += 1 | ||||
|       print("Open error") | ||||
|       assert False | ||||
| 
 | ||||
|     # Switch off relay | ||||
|     black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||||
|     time.sleep(0.05) | ||||
| 
 | ||||
|     if not test_buses(black_panda, other_panda, (0, False, [0, 2])): | ||||
|       closed_errors += 1 | ||||
|       print("Close error") | ||||
|       assert False | ||||
| 
 | ||||
|     counter += 1 | ||||
|     print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors)	 | ||||
| 
 | ||||
| def test_buses(black_panda, other_panda, test_obj): | ||||
|   global content_errors | ||||
|   send_bus, obd, recv_buses = test_obj | ||||
|      | ||||
|   black_panda.send_heartbeat() | ||||
|   other_panda.send_heartbeat() | ||||
|      | ||||
|   # Set OBD on send panda | ||||
|   other_panda.set_gmlan(True if obd else None) | ||||
| 
 | ||||
|   # clear and flush | ||||
|   other_panda.can_clear(send_bus) | ||||
| 
 | ||||
|   for recv_bus in recv_buses: | ||||
|     black_panda.can_clear(recv_bus) | ||||
|      | ||||
|   black_panda.can_recv() | ||||
|   other_panda.can_recv() | ||||
| 
 | ||||
|   # send the characters | ||||
|   at = random.randint(1, 2000) | ||||
|   st = get_test_string()[0:8] | ||||
|   other_panda.can_send(at, st, send_bus) | ||||
|   time.sleep(0.05) | ||||
| 
 | ||||
|   # check for receive | ||||
|   cans_echo = other_panda.can_recv() | ||||
|   cans_loop = black_panda.can_recv() | ||||
| 
 | ||||
|   loop_buses = [] | ||||
|   for loop in cans_loop: | ||||
|     if (loop[0] != at) or (loop[2] != st): | ||||
|       content_errors += 1 | ||||
|     loop_buses.append(loop[3]) | ||||
|      | ||||
|   # test loop buses | ||||
|   recv_buses.sort() | ||||
|   loop_buses.sort() | ||||
|   if(recv_buses != loop_buses): | ||||
|     return False | ||||
|   else: | ||||
|     return True | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   parser = argparse.ArgumentParser() | ||||
|   parser.add_argument("-n", type=int, help="Number of test iterations to run") | ||||
|   parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) | ||||
|   args = parser.parse_args() | ||||
| 
 | ||||
|   if args.n is None: | ||||
|     while True: | ||||
|       run_test(sleep_duration=args.sleep) | ||||
|   else: | ||||
|     for i in range(args.n): | ||||
|       run_test(sleep_duration=args.sleep) | ||||
| @ -0,0 +1,19 @@ | ||||
| #!/usr/bin/env python | ||||
| import time | ||||
| from panda import Panda | ||||
| 	 | ||||
| if __name__ == "__main__": | ||||
|   panda_serials = Panda.list() | ||||
|   pandas = [] | ||||
|   for ps in panda_serials: | ||||
|     pandas.append(Panda(serial=ps)) | ||||
|   if len(pandas) == 0: | ||||
|     print("No pandas connected") | ||||
|     assert False | ||||
| 
 | ||||
|   while True: | ||||
|     for panda in pandas: | ||||
|       print(panda.health()) | ||||
|     print("\n") | ||||
|     time.sleep(0.5) | ||||
| 
 | ||||
					Loading…
					
					
				
		Reference in new issue