commit
94a27e351f
34 changed files with 1502 additions and 135 deletions
@ -0,0 +1,41 @@ |
||||
version: 2 |
||||
jobs: |
||||
safety: |
||||
machine: |
||||
docker_layer_caching: true |
||||
steps: |
||||
- checkout |
||||
- run: |
||||
name: Build image |
||||
command: "docker build -t panda_safety -f tests/safety/Dockerfile ." |
||||
- run: |
||||
name: Run safety test |
||||
command: | |
||||
docker run panda_safety /bin/bash -c "cd /panda/tests/safety; ./test.sh" |
||||
build: |
||||
machine: |
||||
docker_layer_caching: true |
||||
steps: |
||||
- checkout |
||||
- run: |
||||
name: Build image |
||||
command: "docker build -t panda_build -f tests/build/Dockerfile ." |
||||
- run: |
||||
name: Test python package installer |
||||
command: | |
||||
docker run panda_build /bin/bash -c "cd /panda; python setup.py install" |
||||
- run: |
||||
name: Build STM image |
||||
command: | |
||||
docker run panda_build /bin/bash -c "cd /panda/board; make bin" |
||||
- run: |
||||
name: Build ESP image |
||||
command: | |
||||
docker run panda_build /bin/bash -c "cd /panda/boardesp; make user1.bin" |
||||
|
||||
workflows: |
||||
version: 2 |
||||
main: |
||||
jobs: |
||||
- safety |
||||
- build |
@ -1,20 +0,0 @@ |
||||
language: python |
||||
|
||||
cache: |
||||
directories: |
||||
- build/commaai/panda/boardesp/esp-open-sdk/crosstool-NG |
||||
|
||||
addons: |
||||
apt: |
||||
packages: |
||||
- gcc-arm-none-eabi |
||||
- libnewlib-arm-none-eabi |
||||
- gperf |
||||
- texinfo |
||||
- help2man |
||||
|
||||
script: |
||||
- python setup.py install |
||||
- pushd board && make bin && popd |
||||
- pushd boardesp && git clone --recursive https://github.com/pfalcon/esp-open-sdk.git && pushd esp-open-sdk && git checkout 03f5e898a059451ec5f3de30e7feff30455f7cec && LD_LIBRARY_PATH="" make STANDALONE=y && popd && popd |
||||
- pushd boardesp && make user1.bin && popd |
@ -0,0 +1,193 @@ |
||||
// uses tons from safety_toyota
|
||||
// TODO: refactor to repeat less code
|
||||
|
||||
// IPAS override
|
||||
const int32_t IPAS_OVERRIDE_THRESHOLD = 200; // disallow controls when user torque exceeds this value
|
||||
|
||||
struct lookup_t { |
||||
float x[3]; |
||||
float y[3]; |
||||
}; |
||||
|
||||
// 2m/s are added to be less restrictive
|
||||
const struct lookup_t LOOKUP_ANGLE_RATE_UP = { |
||||
{2., 7., 17.}, |
||||
{5., .8, .15}}; |
||||
|
||||
const struct lookup_t LOOKUP_ANGLE_RATE_DOWN = { |
||||
{2., 7., 17.}, |
||||
{5., 3.5, .4}}; |
||||
|
||||
const float RT_ANGLE_FUDGE = 1.5; // for RT checks allow 50% more angle change
|
||||
const float CAN_TO_DEG = 2. / 3.; // convert angles from CAN unit to degrees
|
||||
|
||||
int ipas_state = 1; // 1 disabled, 3 executing angle control, 5 override
|
||||
int angle_control = 0; // 1 if direct angle control packets are seen
|
||||
float speed = 0.; |
||||
|
||||
struct sample_t angle_meas; // last 3 steer angles
|
||||
struct sample_t torque_driver; // last 3 driver steering torque
|
||||
|
||||
// state of angle limits
|
||||
int16_t desired_angle_last = 0; // last desired steer angle
|
||||
int16_t rt_angle_last = 0; // last desired torque for real time check
|
||||
uint32_t ts_angle_last = 0; |
||||
|
||||
int controls_allowed_last = 0; |
||||
|
||||
int to_signed(int d, int bits) { |
||||
if (d >= (1 << (bits - 1))) { |
||||
d -= (1 << bits); |
||||
} |
||||
return d; |
||||
} |
||||
|
||||
// interp function that holds extreme values
|
||||
float interpolate(struct lookup_t xy, float x) { |
||||
int size = sizeof(xy.x) / sizeof(xy.x[0]); |
||||
// x is lower than the first point in the x array. Return the first point
|
||||
if (x <= xy.x[0]) { |
||||
return xy.y[0]; |
||||
|
||||
} else { |
||||
// find the index such that (xy.x[i] <= x < xy.x[i+1]) and linearly interp
|
||||
for (int i=0; i < size-1; i++) { |
||||
if (x < xy.x[i+1]) { |
||||
float x0 = xy.x[i]; |
||||
float y0 = xy.y[i]; |
||||
float dx = xy.x[i+1] - x0; |
||||
float dy = xy.y[i+1] - y0; |
||||
// dx should not be zero as xy.x is supposed ot be monotonic
|
||||
if (dx <= 0.) dx = 0.0001; |
||||
return dy * (x - x0) / dx + y0; |
||||
} |
||||
} |
||||
// if no such point is found, then x > xy.x[size-1]. Return last point
|
||||
return xy.y[size - 1]; |
||||
} |
||||
} |
||||
|
||||
|
||||
static void toyota_ipas_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) { |
||||
// check standard toyota stuff as well
|
||||
toyota_rx_hook(to_push); |
||||
|
||||
if ((to_push->RIR>>21) == 0x260) { |
||||
// get driver steering torque
|
||||
int16_t torque_driver_new = (((to_push->RDLR) & 0xFF00) | ((to_push->RDLR >> 16) & 0xFF)); |
||||
|
||||
// update array of samples
|
||||
update_sample(&torque_driver, torque_driver_new); |
||||
} |
||||
|
||||
// get steer angle
|
||||
if ((to_push->RIR>>21) == 0x25) { |
||||
int angle_meas_new = ((to_push->RDLR & 0xf) << 8) + ((to_push->RDLR & 0xff00) >> 8); |
||||
uint32_t ts = TIM2->CNT; |
||||
|
||||
angle_meas_new = to_signed(angle_meas_new, 12); |
||||
|
||||
// update array of samples
|
||||
update_sample(&angle_meas, angle_meas_new); |
||||
|
||||
// *** angle real time check
|
||||
// add 1 to not false trigger the violation and multiply by 20 since the check is done every 250ms and steer angle is updated at 80Hz
|
||||
int rt_delta_angle_up = ((int)(RT_ANGLE_FUDGE * (interpolate(LOOKUP_ANGLE_RATE_UP, speed) * 20. * CAN_TO_DEG + 1.))); |
||||
int rt_delta_angle_down = ((int)(RT_ANGLE_FUDGE * (interpolate(LOOKUP_ANGLE_RATE_DOWN, speed) * 20. * CAN_TO_DEG + 1.))); |
||||
int highest_rt_angle = rt_angle_last + (rt_angle_last > 0? rt_delta_angle_up:rt_delta_angle_down); |
||||
int lowest_rt_angle = rt_angle_last - (rt_angle_last > 0? rt_delta_angle_down:rt_delta_angle_up); |
||||
|
||||
// every RT_INTERVAL or when controls are turned on, set the new limits
|
||||
uint32_t ts_elapsed = get_ts_elapsed(ts, ts_angle_last); |
||||
if ((ts_elapsed > RT_INTERVAL) || (controls_allowed && !controls_allowed_last)) { |
||||
rt_angle_last = angle_meas_new; |
||||
ts_angle_last = ts; |
||||
} |
||||
|
||||
// check for violation
|
||||
if (angle_control && |
||||
((angle_meas_new < lowest_rt_angle) || |
||||
(angle_meas_new > highest_rt_angle))) { |
||||
controls_allowed = 0; |
||||
} |
||||
|
||||
controls_allowed_last = controls_allowed; |
||||
} |
||||
|
||||
// get speed
|
||||
if ((to_push->RIR>>21) == 0xb4) { |
||||
speed = ((float) (((to_push->RDHR) & 0xFF00) | ((to_push->RDHR >> 16) & 0xFF))) * 0.01 / 3.6; |
||||
} |
||||
|
||||
// get ipas state
|
||||
if ((to_push->RIR>>21) == 0x262) { |
||||
ipas_state = (to_push->RDLR & 0xf); |
||||
} |
||||
|
||||
// exit controls on high steering override
|
||||
if (angle_control && ((torque_driver.min > IPAS_OVERRIDE_THRESHOLD) || |
||||
(torque_driver.max < -IPAS_OVERRIDE_THRESHOLD) || |
||||
(ipas_state==5))) { |
||||
controls_allowed = 0; |
||||
} |
||||
} |
||||
|
||||
static int toyota_ipas_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) { |
||||
|
||||
// Check if msg is sent on BUS 0
|
||||
if (((to_send->RDTR >> 4) & 0xF) == 0) { |
||||
|
||||
// STEER ANGLE
|
||||
if (((to_send->RIR>>21) == 0x266) || ((to_send->RIR>>21) == 0x167)) { |
||||
|
||||
angle_control = 1; // we are in angle control mode
|
||||
int desired_angle = ((to_send->RDLR & 0xf) << 8) + ((to_send->RDLR & 0xff00) >> 8); |
||||
int ipas_state_cmd = ((to_send->RDLR & 0xff) >> 4); |
||||
int16_t violation = 0; |
||||
|
||||
desired_angle = to_signed(desired_angle, 12); |
||||
|
||||
if (controls_allowed) { |
||||
// add 1 to not false trigger the violation
|
||||
int delta_angle_up = (int) (interpolate(LOOKUP_ANGLE_RATE_UP, speed) * CAN_TO_DEG + 1.); |
||||
int delta_angle_down = (int) (interpolate(LOOKUP_ANGLE_RATE_DOWN, speed) * CAN_TO_DEG + 1.); |
||||
int highest_desired_angle = desired_angle_last + (desired_angle_last > 0? delta_angle_up:delta_angle_down); |
||||
int lowest_desired_angle = desired_angle_last - (desired_angle_last > 0? delta_angle_down:delta_angle_up); |
||||
if ((desired_angle > highest_desired_angle) ||
|
||||
(desired_angle < lowest_desired_angle)){ |
||||
violation = 1; |
||||
controls_allowed = 0; |
||||
} |
||||
} |
||||
|
||||
// desired steer angle should be the same as steer angle measured when controls are off
|
||||
if ((!controls_allowed) &&
|
||||
((desired_angle < (angle_meas.min - 1)) || |
||||
(desired_angle > (angle_meas.max + 1)) || |
||||
(ipas_state_cmd != 1))) { |
||||
violation = 1; |
||||
} |
||||
|
||||
desired_angle_last = desired_angle; |
||||
|
||||
if (violation) { |
||||
return false; |
||||
} |
||||
|
||||
return true; |
||||
} |
||||
} |
||||
|
||||
// check standard toyota stuff as well
|
||||
return toyota_tx_hook(to_send); |
||||
} |
||||
|
||||
const safety_hooks toyota_ipas_hooks = { |
||||
.init = toyota_init, |
||||
.rx = toyota_ipas_rx_hook, |
||||
.tx = toyota_ipas_tx_hook, |
||||
.tx_lin = toyota_tx_lin_hook, |
||||
.ignition = toyota_ign_hook, |
||||
.fwd = toyota_fwd_hook, |
||||
}; |
||||
|
@ -0,0 +1,17 @@ |
||||
FROM ubuntu:16.04 |
||||
|
||||
RUN apt-get update && apt-get install -y gcc-arm-none-eabi libnewlib-arm-none-eabi python python-pip gcc g++ git autoconf gperf bison flex automake texinfo wget help2man gawk libtool libtool-bin ncurses-dev unzip unrar-free libexpat-dev sed bzip2 |
||||
|
||||
RUN pip install pycrypto==2.6.1 |
||||
|
||||
# Build esp toolchain |
||||
RUN mkdir -p /panda/boardesp |
||||
WORKDIR /panda/boardesp |
||||
RUN git clone --recursive https://github.com/pfalcon/esp-open-sdk.git |
||||
WORKDIR /panda/boardesp/esp-open-sdk |
||||
RUN git checkout 03f5e898a059451ec5f3de30e7feff30455f7ce |
||||
RUN CT_ALLOW_BUILD_AS_ROOT_SURE=1 make STANDALONE=y |
||||
|
||||
COPY . /panda |
||||
|
||||
WORKDIR /panda |
@ -0,0 +1,6 @@ |
||||
FROM ubuntu:16.04 |
||||
|
||||
RUN apt-get update && apt-get install -y clang make python python-pip |
||||
COPY tests/safety/requirements.txt /panda/tests/safety/requirements.txt |
||||
RUN pip install -r /panda/tests/safety/requirements.txt |
||||
COPY . /panda |
@ -0,0 +1,18 @@ |
||||
CC = clang
|
||||
CCFLAGS = -O3 -fPIC -DPANDA -I.
|
||||
|
||||
.PHONY: all |
||||
all: libpandasafety.so |
||||
|
||||
libpandasafety.so: test.o |
||||
$(CC) -shared -o '$@' $^ -lm
|
||||
|
||||
test.o: test.c |
||||
@echo "[ CC ] $@"
|
||||
$(CC) $(CCFLAGS) -MMD -c -I../../board -o '$@' '$<'
|
||||
|
||||
.PHONY: clean |
||||
clean: |
||||
rm -f libpandasafety.so test.o test.d
|
||||
|
||||
-include test.d |
@ -0,0 +1,60 @@ |
||||
import os |
||||
import subprocess |
||||
|
||||
from cffi import FFI |
||||
|
||||
can_dir = os.path.dirname(os.path.abspath(__file__)) |
||||
libpandasafety_fn = os.path.join(can_dir, "libpandasafety.so") |
||||
subprocess.check_call(["make"], cwd=can_dir) |
||||
|
||||
ffi = FFI() |
||||
ffi.cdef(""" |
||||
typedef struct |
||||
{ |
||||
uint32_t TIR; /*!< CAN TX mailbox identifier register */ |
||||
uint32_t TDTR; /*!< CAN mailbox data length control and time stamp register */ |
||||
uint32_t TDLR; /*!< CAN mailbox data low register */ |
||||
uint32_t TDHR; /*!< CAN mailbox data high register */ |
||||
} CAN_TxMailBox_TypeDef; |
||||
|
||||
typedef struct |
||||
{ |
||||
uint32_t RIR; /*!< CAN receive FIFO mailbox identifier register */ |
||||
uint32_t RDTR; /*!< CAN receive FIFO mailbox data length control and time stamp register */ |
||||
uint32_t RDLR; /*!< CAN receive FIFO mailbox data low register */ |
||||
uint32_t RDHR; /*!< CAN receive FIFO mailbox data high register */ |
||||
} CAN_FIFOMailBox_TypeDef; |
||||
|
||||
typedef struct |
||||
{ |
||||
uint32_t CNT; |
||||
} TIM_TypeDef; |
||||
|
||||
void toyota_rx_hook(CAN_FIFOMailBox_TypeDef *to_push); |
||||
int toyota_tx_hook(CAN_FIFOMailBox_TypeDef *to_send); |
||||
void toyota_init(int16_t param); |
||||
void set_controls_allowed(int c); |
||||
void reset_angle_control(void); |
||||
int get_controls_allowed(void); |
||||
void init_tests_toyota(void); |
||||
void set_timer(int t); |
||||
void set_torque_meas(int min, int max); |
||||
void set_rt_torque_last(int t); |
||||
void set_desired_torque_last(int t); |
||||
int get_torque_meas_min(void); |
||||
int get_torque_meas_max(void); |
||||
|
||||
void init_tests_honda(void); |
||||
int get_ego_speed(void); |
||||
void honda_init(int16_t param); |
||||
void honda_rx_hook(CAN_FIFOMailBox_TypeDef *to_push); |
||||
int honda_tx_hook(CAN_FIFOMailBox_TypeDef *to_send); |
||||
int get_brake_prev(void); |
||||
int get_gas_prev(void); |
||||
|
||||
void toyota_ipas_rx_hook(CAN_FIFOMailBox_TypeDef *to_push); |
||||
int toyota_ipas_tx_hook(CAN_FIFOMailBox_TypeDef *to_send); |
||||
|
||||
""") |
||||
|
||||
libpandasafety = ffi.dlopen(libpandasafety_fn) |
@ -0,0 +1,2 @@ |
||||
cffi==1.11.4 |
||||
numpy==1.14.1 |
@ -0,0 +1,108 @@ |
||||
#include <stdint.h> |
||||
#include <stdbool.h> |
||||
|
||||
typedef struct |
||||
{ |
||||
uint32_t TIR; /*!< CAN TX mailbox identifier register */ |
||||
uint32_t TDTR; /*!< CAN mailbox data length control and time stamp register */ |
||||
uint32_t TDLR; /*!< CAN mailbox data low register */ |
||||
uint32_t TDHR; /*!< CAN mailbox data high register */ |
||||
} CAN_TxMailBox_TypeDef; |
||||
|
||||
typedef struct |
||||
{ |
||||
uint32_t RIR; /*!< CAN receive FIFO mailbox identifier register */ |
||||
uint32_t RDTR; /*!< CAN receive FIFO mailbox data length control and time stamp register */ |
||||
uint32_t RDLR; /*!< CAN receive FIFO mailbox data low register */ |
||||
uint32_t RDHR; /*!< CAN receive FIFO mailbox data high register */ |
||||
} CAN_FIFOMailBox_TypeDef; |
||||
|
||||
typedef struct |
||||
{ |
||||
uint32_t CNT; |
||||
} TIM_TypeDef; |
||||
|
||||
struct sample_t torque_meas; |
||||
|
||||
TIM_TypeDef timer; |
||||
TIM_TypeDef *TIM2 = &timer; |
||||
|
||||
#define min(a,b) \ |
||||
({ __typeof__ (a) _a = (a); \
|
||||
__typeof__ (b) _b = (b); \
|
||||
_a < _b ? _a : _b; }) |
||||
|
||||
#define max(a,b) \ |
||||
({ __typeof__ (a) _a = (a); \
|
||||
__typeof__ (b) _b = (b); \
|
||||
_a > _b ? _a : _b; }) |
||||
|
||||
|
||||
#define PANDA |
||||
#define static |
||||
#include "safety.h" |
||||
|
||||
void set_controls_allowed(int c){ |
||||
controls_allowed = c; |
||||
} |
||||
|
||||
void reset_angle_control(void){ |
||||
angle_control = 0; |
||||
} |
||||
|
||||
int get_controls_allowed(void){ |
||||
return controls_allowed; |
||||
} |
||||
|
||||
void set_timer(int t){ |
||||
timer.CNT = t; |
||||
} |
||||
|
||||
void set_torque_meas(int min, int max){ |
||||
torque_meas.min = min; |
||||
torque_meas.max = max; |
||||
} |
||||
|
||||
int get_torque_meas_min(void){ |
||||
return torque_meas.min; |
||||
} |
||||
|
||||
int get_torque_meas_max(void){ |
||||
return torque_meas.max; |
||||
} |
||||
|
||||
void set_rt_torque_last(int t){ |
||||
rt_torque_last = t; |
||||
} |
||||
|
||||
void set_desired_torque_last(int t){ |
||||
desired_torque_last = t; |
||||
} |
||||
|
||||
int get_ego_speed(void){ |
||||
return ego_speed; |
||||
} |
||||
|
||||
int get_brake_prev(void){ |
||||
return brake_prev; |
||||
} |
||||
|
||||
int get_gas_prev(void){ |
||||
return gas_prev; |
||||
} |
||||
|
||||
void init_tests_toyota(void){ |
||||
torque_meas.min = 0; |
||||
torque_meas.max = 0; |
||||
desired_torque_last = 0; |
||||
rt_torque_last = 0; |
||||
ts_last = 0; |
||||
set_timer(0); |
||||
} |
||||
|
||||
void init_tests_honda(void){ |
||||
ego_speed = 0; |
||||
gas_interceptor_detected = 0; |
||||
brake_prev = 0; |
||||
gas_prev = 0; |
||||
} |
@ -0,0 +1,2 @@ |
||||
#!/usr/bin/env sh |
||||
python -m unittest discover . |
@ -0,0 +1,148 @@ |
||||
#!/usr/bin/env python2 |
||||
import unittest |
||||
import numpy as np |
||||
import libpandasafety_py |
||||
|
||||
|
||||
class TestHondaSafety(unittest.TestCase): |
||||
@classmethod |
||||
def setUp(cls): |
||||
cls.safety = libpandasafety_py.libpandasafety |
||||
cls.safety.honda_init(0) |
||||
cls.safety.init_tests_honda() |
||||
|
||||
def _speed_msg(self, speed): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x158 << 21 |
||||
to_send[0].RDLR = speed |
||||
|
||||
return to_send |
||||
|
||||
def _button_msg(self, buttons): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x1A6 << 21 |
||||
to_send[0].RDLR = buttons << 5 |
||||
|
||||
return to_send |
||||
|
||||
def _brake_msg(self, brake): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x17C << 21 |
||||
to_send[0].RDHR = 0x200000 if brake else 0 |
||||
|
||||
return to_send |
||||
|
||||
def _gas_msg(self, gas): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x17C << 21 |
||||
to_send[0].RDLR = 1 if gas else 0 |
||||
|
||||
return to_send |
||||
|
||||
def _send_brake_msg(self, brake): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x1FA << 21 |
||||
to_send[0].RDLR = brake |
||||
|
||||
return to_send |
||||
|
||||
def _send_gas_msg(self, gas): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x200 << 21 |
||||
to_send[0].RDLR = gas |
||||
|
||||
return to_send |
||||
|
||||
def _send_steer_msg(self, steer): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0xE4 << 21 |
||||
to_send[0].RDLR = steer |
||||
|
||||
return to_send |
||||
|
||||
def test_default_controls_not_allowed(self): |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_resume_button(self): |
||||
RESUME_BTN = 4 |
||||
self.safety.honda_rx_hook(self._button_msg(RESUME_BTN)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_set_button(self): |
||||
SET_BTN = 3 |
||||
self.safety.honda_rx_hook(self._button_msg(SET_BTN)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_cancel_button(self): |
||||
CANCEL_BTN = 2 |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.honda_rx_hook(self._button_msg(CANCEL_BTN)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_sample_speed(self): |
||||
self.assertEqual(0, self.safety.get_ego_speed()) |
||||
self.safety.honda_rx_hook(self._speed_msg(100)) |
||||
self.assertEqual(100, self.safety.get_ego_speed()) |
||||
|
||||
def test_prev_brake(self): |
||||
self.assertFalse(self.safety.get_brake_prev()) |
||||
self.safety.honda_rx_hook(self._brake_msg(True)) |
||||
self.assertTrue(self.safety.get_brake_prev()) |
||||
|
||||
def test_disengage_on_brake(self): |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.honda_rx_hook(self._brake_msg(1)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_allow_brake_at_zero_speed(self): |
||||
# Brake was already pressed |
||||
self.safety.honda_rx_hook(self._brake_msg(True)) |
||||
self.safety.set_controls_allowed(1) |
||||
|
||||
self.safety.honda_rx_hook(self._brake_msg(True)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_not_allow_brake_when_moving(self): |
||||
# Brake was already pressed |
||||
self.safety.honda_rx_hook(self._brake_msg(True)) |
||||
self.safety.honda_rx_hook(self._speed_msg(100)) |
||||
self.safety.set_controls_allowed(1) |
||||
|
||||
self.safety.honda_rx_hook(self._brake_msg(True)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_prev_gas(self): |
||||
self.assertFalse(self.safety.get_gas_prev()) |
||||
self.safety.honda_rx_hook(self._gas_msg(True)) |
||||
self.assertTrue(self.safety.get_gas_prev()) |
||||
|
||||
def test_disengage_on_gas(self): |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.honda_rx_hook(self._gas_msg(1)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_allow_engage_with_gas_pressed(self): |
||||
self.safety.honda_rx_hook(self._gas_msg(1)) |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.honda_rx_hook(self._gas_msg(1)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_brake_safety_check(self): |
||||
self.assertTrue(self.safety.honda_tx_hook(self._send_brake_msg(0x0000))) |
||||
self.assertFalse(self.safety.honda_tx_hook(self._send_brake_msg(0x1000))) |
||||
|
||||
self.safety.set_controls_allowed(1) |
||||
self.assertTrue(self.safety.honda_tx_hook(self._send_brake_msg(0x1000))) |
||||
self.assertFalse(self.safety.honda_tx_hook(self._send_brake_msg(0x00F0))) |
||||
|
||||
def test_gas_safety_check(self): |
||||
self.assertTrue(self.safety.honda_tx_hook(self._send_brake_msg(0x0000))) |
||||
self.assertFalse(self.safety.honda_tx_hook(self._send_brake_msg(0x1000))) |
||||
|
||||
def test_steer_safety_check(self): |
||||
self.assertTrue(self.safety.honda_tx_hook(self._send_steer_msg(0x0000))) |
||||
self.assertFalse(self.safety.honda_tx_hook(self._send_steer_msg(0x1000))) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
@ -0,0 +1,411 @@ |
||||
#!/usr/bin/env python2 |
||||
import unittest |
||||
import numpy as np |
||||
import libpandasafety_py |
||||
|
||||
MAX_RATE_UP = 10 |
||||
MAX_RATE_DOWN = 25 |
||||
MAX_TORQUE = 1500 |
||||
|
||||
MAX_ACCEL = 1500 |
||||
MIN_ACCEL = -3000 |
||||
|
||||
MAX_RT_DELTA = 375 |
||||
RT_INTERVAL = 250000 |
||||
|
||||
MAX_TORQUE_ERROR = 350 |
||||
|
||||
IPAS_OVERRIDE_THRESHOLD = 200 |
||||
|
||||
ANGLE_DELTA_BP = [0., 5., 15.] |
||||
ANGLE_DELTA_V = [5., .8, .15] # windup limit |
||||
ANGLE_DELTA_VU = [5., 3.5, 0.4] # unwind limit |
||||
|
||||
def twos_comp(val, bits): |
||||
if val >= 0: |
||||
return val |
||||
else: |
||||
return (2**bits) + val |
||||
|
||||
def sign(a): |
||||
if a > 0: |
||||
return 1 |
||||
else: |
||||
return -1 |
||||
|
||||
class TestToyotaSafety(unittest.TestCase): |
||||
@classmethod |
||||
def setUp(cls): |
||||
cls.safety = libpandasafety_py.libpandasafety |
||||
cls.safety.toyota_init(100) |
||||
cls.safety.init_tests_toyota() |
||||
|
||||
def _set_prev_torque(self, t): |
||||
self.safety.set_desired_torque_last(t) |
||||
self.safety.set_rt_torque_last(t) |
||||
self.safety.set_torque_meas(t, t) |
||||
|
||||
def _torque_meas_msg(self, torque): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x260 << 21 |
||||
|
||||
t = twos_comp(torque, 16) |
||||
to_send[0].RDHR = t | ((t & 0xFF) << 16) |
||||
return to_send |
||||
|
||||
def _torque_driver_msg(self, torque): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x260 << 21 |
||||
|
||||
t = twos_comp(torque, 16) |
||||
to_send[0].RDLR = t | ((t & 0xFF) << 16) |
||||
return to_send |
||||
|
||||
def _torque_driver_msg_array(self, torque): |
||||
for i in range(3): |
||||
self.safety.toyota_ipas_rx_hook(self._torque_driver_msg(torque)) |
||||
|
||||
def _angle_meas_msg(self, angle): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x25 << 21 |
||||
|
||||
t = twos_comp(angle, 12) |
||||
to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8) |
||||
return to_send |
||||
|
||||
def _angle_meas_msg_array(self, angle): |
||||
for i in range(3): |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(angle)) |
||||
|
||||
def _torque_msg(self, torque): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x2E4 << 21 |
||||
|
||||
t = twos_comp(torque, 16) |
||||
to_send[0].RDLR = t | ((t & 0xFF) << 16) |
||||
return to_send |
||||
|
||||
def _ipas_state_msg(self, state): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x262 << 21 |
||||
|
||||
to_send[0].RDLR = state & 0xF |
||||
return to_send |
||||
|
||||
def _ipas_control_msg(self, angle, state): |
||||
# note: we command 2/3 of the angle due to CAN conversion |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x266 << 21 |
||||
|
||||
t = twos_comp(angle, 12) |
||||
to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8) |
||||
to_send[0].RDLR |= ((state & 0xf) << 4) |
||||
|
||||
return to_send |
||||
|
||||
def _speed_msg(self, speed): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0xb4 << 21 |
||||
speed = int(speed * 100 * 3.6) |
||||
|
||||
to_send[0].RDHR = ((speed & 0xFF) << 16) | (speed & 0xFF00) |
||||
return to_send |
||||
|
||||
def _accel_msg(self, accel): |
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_send[0].RIR = 0x343 << 21 |
||||
|
||||
a = twos_comp(accel, 16) |
||||
to_send[0].RDLR = (a & 0xFF) << 8 | (a >> 8) |
||||
return to_send |
||||
|
||||
def test_default_controls_not_allowed(self): |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_manually_enable_controls_allowed(self): |
||||
self.safety.set_controls_allowed(1) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_enable_control_allowed_from_cruise(self): |
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_push[0].RIR = 0x1D2 << 21 |
||||
to_push[0].RDHR = 0xF00000 |
||||
|
||||
self.safety.toyota_rx_hook(to_push) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
def test_disable_control_allowed_from_cruise(self): |
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||
to_push[0].RIR = 0x1D2 << 21 |
||||
to_push[0].RDHR = 0 |
||||
|
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.toyota_rx_hook(to_push) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
def test_accel_actuation_limits(self): |
||||
for accel in np.arange(MIN_ACCEL - 1000, MAX_ACCEL + 1000, 100): |
||||
for controls_allowed in [True, False]: |
||||
self.safety.set_controls_allowed(controls_allowed) |
||||
|
||||
if controls_allowed: |
||||
send = MIN_ACCEL <= accel <= MAX_ACCEL |
||||
else: |
||||
send = accel == 0 |
||||
self.assertEqual(send, self.safety.toyota_tx_hook(self._accel_msg(accel))) |
||||
|
||||
def test_torque_absolute_limits(self): |
||||
for controls_allowed in [True, False]: |
||||
for torque in np.arange(-MAX_TORQUE - 1000, MAX_TORQUE + 1000, MAX_RATE_UP): |
||||
self.safety.set_controls_allowed(controls_allowed) |
||||
self.safety.set_rt_torque_last(torque) |
||||
self.safety.set_torque_meas(torque, torque) |
||||
self.safety.set_desired_torque_last(torque - MAX_RATE_UP) |
||||
|
||||
if controls_allowed: |
||||
send = (-MAX_TORQUE <= torque <= MAX_TORQUE) |
||||
else: |
||||
send = torque == 0 |
||||
|
||||
self.assertEqual(send, self.safety.toyota_tx_hook(self._torque_msg(torque))) |
||||
|
||||
def test_non_realtime_limit_up(self): |
||||
self.safety.set_controls_allowed(True) |
||||
|
||||
self._set_prev_torque(0) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(MAX_RATE_UP))) |
||||
|
||||
self._set_prev_torque(0) |
||||
self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(MAX_RATE_UP + 1))) |
||||
|
||||
def test_non_realtime_limit_down(self): |
||||
self.safety.set_controls_allowed(True) |
||||
|
||||
self.safety.set_rt_torque_last(1000) |
||||
self.safety.set_torque_meas(500, 500) |
||||
self.safety.set_desired_torque_last(1000) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN))) |
||||
|
||||
self.safety.set_rt_torque_last(1000) |
||||
self.safety.set_torque_meas(500, 500) |
||||
self.safety.set_desired_torque_last(1000) |
||||
self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN + 1))) |
||||
|
||||
def test_exceed_torque_sensor(self): |
||||
self.safety.set_controls_allowed(True) |
||||
|
||||
for sign in [-1, 1]: |
||||
self._set_prev_torque(0) |
||||
for t in np.arange(0, MAX_TORQUE_ERROR + 10, 10): |
||||
t *= sign |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) |
||||
|
||||
self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10)))) |
||||
|
||||
def test_realtime_limit_up(self): |
||||
self.safety.set_controls_allowed(True) |
||||
|
||||
for sign in [-1, 1]: |
||||
self.safety.init_tests_toyota() |
||||
self._set_prev_torque(0) |
||||
for t in np.arange(0, 380, 10): |
||||
t *= sign |
||||
self.safety.set_torque_meas(t, t) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) |
||||
self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(sign * 380))) |
||||
|
||||
self._set_prev_torque(0) |
||||
for t in np.arange(0, 370, 10): |
||||
t *= sign |
||||
self.safety.set_torque_meas(t, t) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) |
||||
|
||||
# Increase timer to update rt_torque_last |
||||
self.safety.set_timer(RT_INTERVAL + 1) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(sign * 370))) |
||||
self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(sign * 380))) |
||||
|
||||
def test_torque_measurements(self): |
||||
self.safety.toyota_rx_hook(self._torque_meas_msg(50)) |
||||
self.safety.toyota_rx_hook(self._torque_meas_msg(-50)) |
||||
self.safety.toyota_rx_hook(self._torque_meas_msg(0)) |
||||
|
||||
self.assertEqual(-51, self.safety.get_torque_meas_min()) |
||||
self.assertEqual(51, self.safety.get_torque_meas_max()) |
||||
|
||||
self.safety.toyota_rx_hook(self._torque_meas_msg(0)) |
||||
self.assertEqual(-1, self.safety.get_torque_meas_max()) |
||||
self.assertEqual(-51, self.safety.get_torque_meas_min()) |
||||
|
||||
self.safety.toyota_rx_hook(self._torque_meas_msg(0)) |
||||
self.assertEqual(-1, self.safety.get_torque_meas_max()) |
||||
self.assertEqual(-1, self.safety.get_torque_meas_min()) |
||||
|
||||
def test_ipas_override(self): |
||||
|
||||
## angle control is not active |
||||
self.safety.set_controls_allowed(1) |
||||
|
||||
# 3 consecutive msgs where driver exceeds threshold but angle_control isn't active |
||||
self.safety.set_controls_allowed(1) |
||||
self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD + 1) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD - 1) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
# ipas state is override |
||||
self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(5)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
## now angle control is active |
||||
self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 0)) |
||||
self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(0)) |
||||
|
||||
# 3 consecutive msgs where driver does exceed threshold |
||||
self.safety.set_controls_allowed(1) |
||||
self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD + 1) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
self.safety.set_controls_allowed(1) |
||||
self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD - 1) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
# ipas state is override and torque isn't overriding any more |
||||
self.safety.set_controls_allowed(1) |
||||
self._torque_driver_msg_array(0) |
||||
self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(5)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
# 3 consecutive msgs where driver does not exceed threshold and |
||||
# ipas state is not override |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(0)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
def test_angle_cmd_when_disabled(self): |
||||
|
||||
self.safety.set_controls_allowed(0) |
||||
|
||||
# test angle cmd too far from actual |
||||
angle_refs = [-10, 10] |
||||
deltas = range(-2, 3) |
||||
expected_results = [False, True, True, True, False] |
||||
|
||||
for a in angle_refs: |
||||
self._angle_meas_msg_array(a) |
||||
for i, d in enumerate(deltas): |
||||
self.assertEqual(expected_results[i], self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + d, 1))) |
||||
|
||||
# test ipas state cmd enabled |
||||
self._angle_meas_msg_array(0) |
||||
self.assertEqual(0, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 3))) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
def test_angle_cmd_when_enabled(self): |
||||
|
||||
# ipas angle cmd should pass through when controls are enabled |
||||
|
||||
self.safety.set_controls_allowed(1) |
||||
self._angle_meas_msg_array(0) |
||||
self.safety.toyota_ipas_rx_hook(self._speed_msg(0.1)) |
||||
|
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 1))) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(4, 1))) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 3))) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-4, 3))) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-8, 3))) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
|
||||
def test_angle_cmd_rate_when_disabled(self): |
||||
|
||||
# as long as the command is close to the measured, no rate limit is enforced when |
||||
# controls are disabled |
||||
self.safety.set_controls_allowed(0) |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(0)) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 1))) |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(100)) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(100, 1))) |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(-100)) |
||||
self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-100, 1))) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
def test_angle_cmd_rate_when_enabled(self): |
||||
|
||||
# when controls are allowed, angle cmd rate limit is enforced |
||||
# test 1: no limitations if we stay within limits |
||||
speeds = [0., 1., 5., 10., 15., 100.] |
||||
angles = [-300, -100, -10, 0, 10, 100, 300] |
||||
for a in angles: |
||||
for s in speeds: |
||||
|
||||
# first test against false positives |
||||
self._angle_meas_msg_array(a) |
||||
self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1)) |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.toyota_ipas_rx_hook(self._speed_msg(s)) |
||||
max_delta_up = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_V) * 2 / 3. + 1.) |
||||
max_delta_down = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_VU) * 2 / 3. + 1.) |
||||
self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * max_delta_up, 1))) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1))) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a - sign(a) * max_delta_down, 1))) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
|
||||
# now inject too high rates |
||||
self.assertEqual(False, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * |
||||
(max_delta_up + 1), 1))) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
self.safety.set_controls_allowed(1) |
||||
self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * max_delta_up, 1))) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1))) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
self.assertEqual(False, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a - sign(a) * |
||||
(max_delta_down + 1), 1))) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
def test_angle_measured_rate(self): |
||||
|
||||
speeds = [0., 1., 5., 10., 15., 100.] |
||||
angles = [-300, -100, -10, 0, 10, 100, 300] |
||||
angles = [10] |
||||
for a in angles: |
||||
for s in speeds: |
||||
self._angle_meas_msg_array(a) |
||||
self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1)) |
||||
self.safety.set_controls_allowed(1) |
||||
self.safety.toyota_ipas_rx_hook(self._speed_msg(s)) |
||||
max_delta_up = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_V) * 2 / 3. + 1.) |
||||
max_delta_down = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_VU) * 2 / 3. + 1.) |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(a)) |
||||
self.assertTrue(self.safety.get_controls_allowed()) |
||||
self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(a + 150)) |
||||
self.assertFalse(self.safety.get_controls_allowed()) |
||||
|
||||
# reset no angle control at the end of the test |
||||
self.safety.reset_angle_control() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
Loading…
Reference in new issue