commit
c210011c84
24 changed files with 1291 additions and 139 deletions
@ -1 +1 @@ |
|||||||
v1.1.1 |
v1.1.2 |
@ -0,0 +1,203 @@ |
|||||||
|
#define MAX_BITS_CAN_PACKET (200) |
||||||
|
|
||||||
|
// returns out_len
|
||||||
|
int do_bitstuff(char *out, char *in, int in_len) { |
||||||
|
int last_bit = -1; |
||||||
|
int bit_cnt = 0; |
||||||
|
int j = 0; |
||||||
|
for (int i = 0; i < in_len; i++) { |
||||||
|
char bit = in[i]; |
||||||
|
out[j++] = bit; |
||||||
|
|
||||||
|
// do the stuffing
|
||||||
|
if (bit == last_bit) { |
||||||
|
bit_cnt++; |
||||||
|
if (bit_cnt == 5) { |
||||||
|
// 5 in a row the same, do stuff
|
||||||
|
last_bit = !bit; |
||||||
|
out[j++] = last_bit; |
||||||
|
bit_cnt = 1; |
||||||
|
} |
||||||
|
} else { |
||||||
|
// this is a new bit
|
||||||
|
last_bit = bit; |
||||||
|
bit_cnt = 1; |
||||||
|
} |
||||||
|
} |
||||||
|
return j; |
||||||
|
} |
||||||
|
|
||||||
|
int append_crc(char *in, int in_len) { |
||||||
|
int crc = 0; |
||||||
|
for (int i = 0; i < in_len; i++) { |
||||||
|
crc <<= 1; |
||||||
|
if (in[i] ^ ((crc>>15)&1)) { |
||||||
|
crc = crc ^ 0x4599; |
||||||
|
} |
||||||
|
crc &= 0x7fff; |
||||||
|
} |
||||||
|
for (int i = 14; i >= 0; i--) { |
||||||
|
in[in_len++] = (crc>>i)&1; |
||||||
|
} |
||||||
|
return in_len; |
||||||
|
} |
||||||
|
|
||||||
|
int append_bits(char *in, int in_len, char *app, int app_len) { |
||||||
|
for (int i = 0; i < app_len; i++) { |
||||||
|
in[in_len++] = app[i]; |
||||||
|
} |
||||||
|
return in_len; |
||||||
|
} |
||||||
|
|
||||||
|
int append_int(char *in, int in_len, int val, int val_len) { |
||||||
|
for (int i = val_len-1; i >= 0; i--) { |
||||||
|
in[in_len++] = (val&(1<<i)) != 0; |
||||||
|
} |
||||||
|
return in_len; |
||||||
|
} |
||||||
|
|
||||||
|
int get_bit_message(char *out, CAN_FIFOMailBox_TypeDef *to_bang) { |
||||||
|
char pkt[MAX_BITS_CAN_PACKET]; |
||||||
|
char footer[] = { |
||||||
|
1, // CRC delimiter
|
||||||
|
1, // ACK
|
||||||
|
1, // ACK delimiter
|
||||||
|
1,1,1,1,1,1,1, // EOF
|
||||||
|
1,1,1, // IFS
|
||||||
|
}; |
||||||
|
|
||||||
|
int len = 0; |
||||||
|
|
||||||
|
// test packet
|
||||||
|
int dlc_len = to_bang->RDTR & 0xF; |
||||||
|
len = append_int(pkt, len, 0, 1); // Start-of-frame
|
||||||
|
|
||||||
|
if (to_bang->RIR & 4) { |
||||||
|
// extended identifier
|
||||||
|
len = append_int(pkt, len, to_bang->RIR >> 21, 11); // Identifier
|
||||||
|
len = append_int(pkt, len, 3, 2); // SRR+IDE
|
||||||
|
len = append_int(pkt, len, (to_bang->RIR >> 3) & ((1<<18)-1), 18); // Identifier
|
||||||
|
len = append_int(pkt, len, 0, 3); // RTR+r1+r0
|
||||||
|
} else { |
||||||
|
// standard identifier
|
||||||
|
len = append_int(pkt, len, to_bang->RIR >> 21, 11); // Identifier
|
||||||
|
len = append_int(pkt, len, 0, 3); // RTR+IDE+reserved
|
||||||
|
} |
||||||
|
|
||||||
|
len = append_int(pkt, len, dlc_len, 4); // Data length code
|
||||||
|
|
||||||
|
// append data
|
||||||
|
for (int i = 0; i < dlc_len; i++) { |
||||||
|
unsigned char dat = ((unsigned char *)(&(to_bang->RDLR)))[i]; |
||||||
|
len = append_int(pkt, len, dat, 8); |
||||||
|
} |
||||||
|
|
||||||
|
// append crc
|
||||||
|
len = append_crc(pkt, len); |
||||||
|
|
||||||
|
// do bitstuffing
|
||||||
|
len = do_bitstuff(out, pkt, len); |
||||||
|
|
||||||
|
// append footer
|
||||||
|
len = append_bits(out, len, footer, sizeof(footer)); |
||||||
|
return len; |
||||||
|
} |
||||||
|
|
||||||
|
// hardware stuff below this line
|
||||||
|
|
||||||
|
#ifdef PANDA |
||||||
|
|
||||||
|
void set_bitbanged_gmlan(int val) { |
||||||
|
if (val) { |
||||||
|
GPIOB->ODR |= (1 << 13); |
||||||
|
} else { |
||||||
|
GPIOB->ODR &= ~(1 << 13); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
char pkt_stuffed[MAX_BITS_CAN_PACKET]; |
||||||
|
int gmlan_sending = -1; |
||||||
|
int gmlan_sendmax = -1; |
||||||
|
|
||||||
|
int gmlan_silent_count = 0; |
||||||
|
int gmlan_fail_count = 0; |
||||||
|
#define REQUIRED_SILENT_TIME 10 |
||||||
|
#define MAX_FAIL_COUNT 10 |
||||||
|
|
||||||
|
void TIM4_IRQHandler(void) { |
||||||
|
if (TIM4->SR & TIM_SR_UIF && gmlan_sendmax != -1) { |
||||||
|
int read = get_gpio_input(GPIOB, 12); |
||||||
|
if (gmlan_silent_count < REQUIRED_SILENT_TIME) { |
||||||
|
if (read == 0) { |
||||||
|
gmlan_silent_count = 0; |
||||||
|
} else { |
||||||
|
gmlan_silent_count++; |
||||||
|
} |
||||||
|
} else if (gmlan_silent_count == REQUIRED_SILENT_TIME) { |
||||||
|
int retry = 0; |
||||||
|
// in send loop
|
||||||
|
if (gmlan_sending > 0 && // not first bit
|
||||||
|
(read == 0 && pkt_stuffed[gmlan_sending-1] == 1) && // bus wrongly dominant
|
||||||
|
gmlan_sending != (gmlan_sendmax-11)) { //not ack bit
|
||||||
|
puts("GMLAN ERR: bus driven at "); |
||||||
|
puth(gmlan_sending); |
||||||
|
puts("\n"); |
||||||
|
retry = 1; |
||||||
|
} else if (read == 1 && gmlan_sending == (gmlan_sendmax-11)) { // recessive during ACK
|
||||||
|
puts("GMLAN ERR: didn't recv ACK\n"); |
||||||
|
retry = 1; |
||||||
|
} |
||||||
|
if (retry) { |
||||||
|
// reset sender (retry after 7 silent)
|
||||||
|
set_bitbanged_gmlan(1); // recessive
|
||||||
|
gmlan_silent_count = 0; |
||||||
|
gmlan_sending = 0; |
||||||
|
gmlan_fail_count++; |
||||||
|
if (gmlan_fail_count == MAX_FAIL_COUNT) { |
||||||
|
puts("GMLAN ERR: giving up send\n"); |
||||||
|
} |
||||||
|
} else { |
||||||
|
set_bitbanged_gmlan(pkt_stuffed[gmlan_sending]); |
||||||
|
gmlan_sending++; |
||||||
|
} |
||||||
|
} |
||||||
|
if (gmlan_sending == gmlan_sendmax || gmlan_fail_count == MAX_FAIL_COUNT) { |
||||||
|
set_bitbanged_gmlan(1); // recessive
|
||||||
|
set_gpio_mode(GPIOB, 13, MODE_INPUT); |
||||||
|
TIM4->DIER = 0; // no update interrupt
|
||||||
|
TIM4->CR1 = 0; // disable timer
|
||||||
|
gmlan_sendmax = -1; // exit
|
||||||
|
} |
||||||
|
} |
||||||
|
TIM4->SR = 0; |
||||||
|
} |
||||||
|
|
||||||
|
void bitbang_gmlan(CAN_FIFOMailBox_TypeDef *to_bang) { |
||||||
|
// TODO: make failure less silent
|
||||||
|
if (gmlan_sendmax != -1) return; |
||||||
|
|
||||||
|
int len = get_bit_message(pkt_stuffed, to_bang); |
||||||
|
gmlan_fail_count = 0; |
||||||
|
gmlan_silent_count = 0; |
||||||
|
gmlan_sending = 0; |
||||||
|
gmlan_sendmax = len; |
||||||
|
|
||||||
|
// setup for bitbang loop
|
||||||
|
set_bitbanged_gmlan(1); // recessive
|
||||||
|
set_gpio_mode(GPIOB, 13, MODE_OUTPUT); |
||||||
|
|
||||||
|
// setup
|
||||||
|
TIM4->PSC = 48-1; // tick on 1 us
|
||||||
|
TIM4->CR1 = TIM_CR1_CEN; // enable
|
||||||
|
TIM4->ARR = 30-1; // 33.3 kbps
|
||||||
|
|
||||||
|
// in case it's disabled
|
||||||
|
NVIC_EnableIRQ(TIM4_IRQn); |
||||||
|
|
||||||
|
// run the interrupt
|
||||||
|
TIM4->DIER = TIM_DIER_UIE; // update interrupt
|
||||||
|
TIM4->SR = 0; |
||||||
|
} |
||||||
|
|
||||||
|
#endif |
||||||
|
|
@ -0,0 +1,131 @@ |
|||||||
|
const int CADILLAC_MAX_STEER = 150; // 1s
|
||||||
|
// real time torque limit to prevent controls spamming
|
||||||
|
// the real time limit is 1500/sec
|
||||||
|
const int CADILLAC_MAX_RT_DELTA = 75; // max delta torque allowed for real time checks
|
||||||
|
const int32_t CADILLAC_RT_INTERVAL = 250000; // 250ms between real time checks
|
||||||
|
const int CADILLAC_MAX_RATE_UP = 2; |
||||||
|
const int CADILLAC_MAX_RATE_DOWN = 5; |
||||||
|
const int CADILLAC_DRIVER_TORQUE_ALLOWANCE = 50; |
||||||
|
const int CADILLAC_DRIVER_TORQUE_FACTOR = 4; |
||||||
|
|
||||||
|
int cadillac_ign = 0; |
||||||
|
int cadillac_cruise_engaged_last = 0; |
||||||
|
int cadillac_rt_torque_last = 0; |
||||||
|
int cadillac_desired_torque_last[4] = {0}; // 4 torque messages
|
||||||
|
uint32_t cadillac_ts_last = 0; |
||||||
|
int cadillac_supercruise_on = 0; |
||||||
|
struct sample_t cadillac_torque_driver; // last few driver torques measured
|
||||||
|
|
||||||
|
int cadillac_get_torque_idx(uint32_t addr) { |
||||||
|
if (addr==0x151) return 0; |
||||||
|
else if (addr==0x152) return 1; |
||||||
|
else if (addr==0x153) return 2; |
||||||
|
else return 3; |
||||||
|
} |
||||||
|
|
||||||
|
static void cadillac_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) { |
||||||
|
int bus_number = (to_push->RDTR >> 4) & 0xFF; |
||||||
|
uint32_t addr = to_push->RIR >> 21; |
||||||
|
|
||||||
|
if (addr == 356) { |
||||||
|
int torque_driver_new = ((to_push->RDLR & 0x7) << 8) | ((to_push->RDLR >> 8) & 0xFF); |
||||||
|
torque_driver_new = to_signed(torque_driver_new, 11); |
||||||
|
// update array of samples
|
||||||
|
update_sample(&cadillac_torque_driver, torque_driver_new); |
||||||
|
} |
||||||
|
|
||||||
|
// this message isn't all zeros when ignition is on
|
||||||
|
if (addr == 0x160 && bus_number == 0) { |
||||||
|
cadillac_ign = to_push->RDLR > 0; |
||||||
|
} |
||||||
|
|
||||||
|
// enter controls on rising edge of ACC, exit controls on ACC off
|
||||||
|
if ((addr == 0x370) && (bus_number == 0)) { |
||||||
|
int cruise_engaged = to_push->RDLR & 0x800000; // bit 23
|
||||||
|
if (cruise_engaged && !cadillac_cruise_engaged_last) { |
||||||
|
controls_allowed = 1; |
||||||
|
} else if (!cruise_engaged) { |
||||||
|
controls_allowed = 0; |
||||||
|
} |
||||||
|
cadillac_cruise_engaged_last = cruise_engaged; |
||||||
|
} |
||||||
|
|
||||||
|
// know supercruise mode and block openpilot msgs if on
|
||||||
|
if ((addr == 0x152) || (addr == 0x154)) { |
||||||
|
cadillac_supercruise_on = (to_push->RDHR>>4) & 0x1; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static int cadillac_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) { |
||||||
|
uint32_t addr = to_send->RIR >> 21; |
||||||
|
|
||||||
|
// steer cmd checks
|
||||||
|
if (addr == 0x151 || addr == 0x152 || addr == 0x153 || addr == 0x154) { |
||||||
|
int desired_torque = ((to_send->RDLR & 0x3f) << 8) + ((to_send->RDLR & 0xff00) >> 8); |
||||||
|
int violation = 0; |
||||||
|
uint32_t ts = TIM2->CNT; |
||||||
|
int idx = cadillac_get_torque_idx(addr); |
||||||
|
desired_torque = to_signed(desired_torque, 14); |
||||||
|
|
||||||
|
if (controls_allowed) { |
||||||
|
|
||||||
|
// *** global torque limit check ***
|
||||||
|
violation |= max_limit_check(desired_torque, CADILLAC_MAX_STEER); |
||||||
|
|
||||||
|
// *** torque rate limit check ***
|
||||||
|
int desired_torque_last = cadillac_desired_torque_last[idx]; |
||||||
|
violation |= driver_limit_check(desired_torque, desired_torque_last, &cadillac_torque_driver, |
||||||
|
CADILLAC_MAX_STEER, CADILLAC_MAX_RATE_UP, CADILLAC_MAX_RATE_DOWN, |
||||||
|
CADILLAC_DRIVER_TORQUE_ALLOWANCE, CADILLAC_DRIVER_TORQUE_FACTOR); |
||||||
|
|
||||||
|
// used next time
|
||||||
|
cadillac_desired_torque_last[idx] = desired_torque; |
||||||
|
|
||||||
|
// *** torque real time rate limit check ***
|
||||||
|
violation |= rt_rate_limit_check(desired_torque, cadillac_rt_torque_last, CADILLAC_MAX_RT_DELTA); |
||||||
|
|
||||||
|
// every RT_INTERVAL set the new limits
|
||||||
|
uint32_t ts_elapsed = get_ts_elapsed(ts, cadillac_ts_last); |
||||||
|
if (ts_elapsed > CADILLAC_RT_INTERVAL) { |
||||||
|
cadillac_rt_torque_last = desired_torque; |
||||||
|
cadillac_ts_last = ts; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// no torque if controls is not allowed
|
||||||
|
if (!controls_allowed && (desired_torque != 0)) { |
||||||
|
violation = 1; |
||||||
|
} |
||||||
|
|
||||||
|
// reset to 0 if either controls is not allowed or there's a violation
|
||||||
|
if (violation || !controls_allowed) { |
||||||
|
cadillac_desired_torque_last[idx] = 0; |
||||||
|
cadillac_rt_torque_last = 0; |
||||||
|
cadillac_ts_last = ts; |
||||||
|
} |
||||||
|
|
||||||
|
if (violation || cadillac_supercruise_on) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
static void cadillac_init(int16_t param) { |
||||||
|
controls_allowed = 0; |
||||||
|
cadillac_ign = 0; |
||||||
|
} |
||||||
|
|
||||||
|
static int cadillac_ign_hook() { |
||||||
|
return cadillac_ign; |
||||||
|
} |
||||||
|
|
||||||
|
const safety_hooks cadillac_hooks = { |
||||||
|
.init = cadillac_init, |
||||||
|
.rx = cadillac_rx_hook, |
||||||
|
.tx = cadillac_tx_hook, |
||||||
|
.tx_lin = alloutput_tx_lin_hook, |
||||||
|
.ignition = cadillac_ign_hook, |
||||||
|
.fwd = alloutput_fwd_hook, |
||||||
|
}; |
@ -0,0 +1,17 @@ |
|||||||
|
#!/usr/bin/env python |
||||||
|
import time |
||||||
|
from panda import Panda |
||||||
|
|
||||||
|
p = Panda() |
||||||
|
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||||
|
p.set_gmlan(bus=2) |
||||||
|
#p.can_send(0xaaa, "\x00\x00", bus=3) |
||||||
|
last_add = None |
||||||
|
while 1: |
||||||
|
ret = p.can_recv() |
||||||
|
if len(ret) > 0: |
||||||
|
add = ret[0][0] |
||||||
|
if last_add is not None and add != last_add+1: |
||||||
|
print "MISS %d %d" % (last_add, add) |
||||||
|
last_add = add |
||||||
|
print ret |
@ -0,0 +1,35 @@ |
|||||||
|
#!/usr/bin/env python |
||||||
|
import numpy as np |
||||||
|
import visa |
||||||
|
import matplotlib.pyplot as plt |
||||||
|
|
||||||
|
resources = visa.ResourceManager() |
||||||
|
print resources.list_resources() |
||||||
|
|
||||||
|
scope = resources.open_resource('USB0::0x1AB1::0x04CE::DS1ZA184652242::INSTR', timeout=2000, chunk_size=1024000) |
||||||
|
print(scope.query('*IDN?').strip()) |
||||||
|
|
||||||
|
#voltscale = scope.ask_for_values(':CHAN1:SCAL?')[0] |
||||||
|
#voltoffset = scope.ask_for_values(":CHAN1:OFFS?")[0] |
||||||
|
|
||||||
|
#scope.write(":STOP") |
||||||
|
scope.write(":WAV:POIN:MODE RAW") |
||||||
|
scope.write(":WAV:DATA? CHAN1")[10:] |
||||||
|
rawdata = scope.read_raw() |
||||||
|
data = np.frombuffer(rawdata, 'B') |
||||||
|
print data.shape |
||||||
|
|
||||||
|
s1 = data[0:650] |
||||||
|
s2 = data[650:] |
||||||
|
s1i = np.argmax(s1 > 100) |
||||||
|
s2i = np.argmax(s2 > 100) |
||||||
|
s1 = s1[s1i:] |
||||||
|
s2 = s2[s2i:] |
||||||
|
|
||||||
|
plt.plot(s1) |
||||||
|
plt.plot(s2) |
||||||
|
plt.show() |
||||||
|
#data = (data - 130.0 - voltoffset/voltscale*25) / 25 * voltscale |
||||||
|
|
||||||
|
print data |
||||||
|
|
@ -0,0 +1,33 @@ |
|||||||
|
#!/usr/bin/env python |
||||||
|
import time |
||||||
|
from panda import Panda |
||||||
|
|
||||||
|
p1 = Panda('380016000551363338383037') |
||||||
|
p2 = Panda('430026000951363338383037') |
||||||
|
|
||||||
|
# this is a test, no safety |
||||||
|
p1.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||||
|
p2.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||||
|
|
||||||
|
# get versions |
||||||
|
print(p1.get_version()) |
||||||
|
print(p2.get_version()) |
||||||
|
|
||||||
|
# this sets bus 2 to actually be GMLAN |
||||||
|
p2.set_gmlan(bus=2) |
||||||
|
|
||||||
|
# send w bitbang then without |
||||||
|
#iden = 123 |
||||||
|
iden = 18000 |
||||||
|
#dat = "\x01\x02" |
||||||
|
dat = "\x01\x02\x03\x04\x05\x06\x07\x08" |
||||||
|
while 1: |
||||||
|
iden += 1 |
||||||
|
p1.set_gmlan(bus=None) |
||||||
|
p1.can_send(iden, dat, bus=3) |
||||||
|
#p1.set_gmlan(bus=2) |
||||||
|
#p1.can_send(iden, dat, bus=3) |
||||||
|
time.sleep(0.01) |
||||||
|
print p2.can_recv() |
||||||
|
#exit(0) |
||||||
|
|
@ -0,0 +1,23 @@ |
|||||||
|
#!/usr/bin/env python |
||||||
|
import time |
||||||
|
from panda import Panda |
||||||
|
|
||||||
|
p = Panda() |
||||||
|
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) |
||||||
|
|
||||||
|
# ack any crap on bus |
||||||
|
p.set_gmlan(bus=2) |
||||||
|
time.sleep(0.1) |
||||||
|
while len(p.can_recv()) > 0: |
||||||
|
print "clearing" |
||||||
|
time.sleep(0.1) |
||||||
|
print "cleared" |
||||||
|
p.set_gmlan(bus=None) |
||||||
|
|
||||||
|
iden = 18000 |
||||||
|
dat = "\x01\x02\x03\x04\x05\x06\x07\x08" |
||||||
|
while 1: |
||||||
|
iden += 1 |
||||||
|
p.can_send(iden, dat, bus=3) |
||||||
|
time.sleep(0.01) |
||||||
|
|
@ -0,0 +1,32 @@ |
|||||||
|
#include <stdio.h> |
||||||
|
#include <stdint.h> |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
uint32_t RIR; /*!< CAN receive FIFO mailbox identifier register */ |
||||||
|
uint32_t RDTR; /*!< CAN receive FIFO mailbox data length control and time stamp register */ |
||||||
|
uint32_t RDLR; /*!< CAN receive FIFO mailbox data low register */ |
||||||
|
uint32_t RDHR; /*!< CAN receive FIFO mailbox data high register */ |
||||||
|
} CAN_FIFOMailBox_TypeDef; |
||||||
|
|
||||||
|
#include "../../board/drivers/canbitbang.h" |
||||||
|
|
||||||
|
int main() { |
||||||
|
char out[300]; |
||||||
|
CAN_FIFOMailBox_TypeDef to_bang = {0}; |
||||||
|
to_bang.RIR = 20 << 21; |
||||||
|
to_bang.RDTR = 1; |
||||||
|
to_bang.RDLR = 1; |
||||||
|
|
||||||
|
int len = get_bit_message(out, &to_bang); |
||||||
|
printf("T:"); |
||||||
|
for (int i = 0; i < len; i++) { |
||||||
|
printf("%d", out[i]); |
||||||
|
} |
||||||
|
printf("\n"); |
||||||
|
printf("R:0000010010100000100010000010011110111010100111111111111111"); |
||||||
|
printf("\n"); |
||||||
|
return 0; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,182 @@ |
|||||||
|
#!/usr/bin/env python2 |
||||||
|
import unittest |
||||||
|
import numpy as np |
||||||
|
import libpandasafety_py |
||||||
|
|
||||||
|
MAX_RATE_UP = 2 |
||||||
|
MAX_RATE_DOWN = 5 |
||||||
|
MAX_TORQUE = 150 |
||||||
|
|
||||||
|
MAX_RT_DELTA = 75 |
||||||
|
RT_INTERVAL = 250000 |
||||||
|
|
||||||
|
DRIVER_TORQUE_ALLOWANCE = 50; |
||||||
|
DRIVER_TORQUE_FACTOR = 4; |
||||||
|
|
||||||
|
IPAS_OVERRIDE_THRESHOLD = 200 |
||||||
|
|
||||||
|
def twos_comp(val, bits): |
||||||
|
if val >= 0: |
||||||
|
return val |
||||||
|
else: |
||||||
|
return (2**bits) + val |
||||||
|
|
||||||
|
def sign(a): |
||||||
|
if a > 0: |
||||||
|
return 1 |
||||||
|
else: |
||||||
|
return -1 |
||||||
|
|
||||||
|
class TestCadillacSafety(unittest.TestCase): |
||||||
|
@classmethod |
||||||
|
def setUp(cls): |
||||||
|
cls.safety = libpandasafety_py.libpandasafety |
||||||
|
cls.safety.cadillac_init(0) |
||||||
|
cls.safety.init_tests_cadillac() |
||||||
|
|
||||||
|
def _set_prev_torque(self, t): |
||||||
|
self.safety.set_cadillac_desired_torque_last(t) |
||||||
|
self.safety.set_cadillac_rt_torque_last(t) |
||||||
|
|
||||||
|
def _torque_driver_msg(self, torque): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 0x164 << 21 |
||||||
|
|
||||||
|
t = twos_comp(torque, 11) |
||||||
|
to_send[0].RDLR = ((t >> 8) & 0x7) | ((t & 0xFF) << 8) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _torque_driver_msg_array(self, torque): |
||||||
|
for i in range(3): |
||||||
|
self.safety.cadillac_ipas_rx_hook(self._torque_driver_msg(torque)) |
||||||
|
|
||||||
|
def _torque_msg(self, torque): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 0x151 << 21 |
||||||
|
|
||||||
|
t = twos_comp(torque, 14) |
||||||
|
to_send[0].RDLR = ((t >> 8) & 0x3F) | ((t & 0xFF) << 8) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def test_default_controls_not_allowed(self): |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_manually_enable_controls_allowed(self): |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
self.safety.set_controls_allowed(0) |
||||||
|
|
||||||
|
def test_enable_control_allowed_from_cruise(self): |
||||||
|
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_push[0].RIR = 0x370 << 21 |
||||||
|
to_push[0].RDLR = 0x800000 |
||||||
|
to_push[0].RDTR = 0 |
||||||
|
|
||||||
|
self.safety.cadillac_rx_hook(to_push) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_disable_control_allowed_from_cruise(self): |
||||||
|
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_push[0].RIR = 0x370 << 21 |
||||||
|
to_push[0].RDLR = 0 |
||||||
|
to_push[0].RDTR = 0 |
||||||
|
|
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.safety.cadillac_rx_hook(to_push) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_torque_absolute_limits(self): |
||||||
|
for controls_allowed in [True, False]: |
||||||
|
for torque in np.arange(-MAX_TORQUE - 1000, MAX_TORQUE + 1000, MAX_RATE_UP): |
||||||
|
self.safety.set_controls_allowed(controls_allowed) |
||||||
|
self.safety.set_cadillac_rt_torque_last(torque) |
||||||
|
self.safety.set_cadillac_torque_driver(0, 0) |
||||||
|
self.safety.set_cadillac_desired_torque_last(torque - MAX_RATE_UP) |
||||||
|
|
||||||
|
if controls_allowed: |
||||||
|
send = (-MAX_TORQUE <= torque <= MAX_TORQUE) |
||||||
|
else: |
||||||
|
send = torque == 0 |
||||||
|
|
||||||
|
self.assertEqual(send, self.safety.cadillac_tx_hook(self._torque_msg(torque))) |
||||||
|
|
||||||
|
def test_non_realtime_limit_up(self): |
||||||
|
self.safety.set_cadillac_torque_driver(0, 0) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(MAX_RATE_UP))) |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(-MAX_RATE_UP))) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg(MAX_RATE_UP + 1))) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg(-MAX_RATE_UP - 1))) |
||||||
|
|
||||||
|
def test_non_realtime_limit_down(self): |
||||||
|
self.safety.set_cadillac_torque_driver(0, 0) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
def test_exceed_torque_sensor(self): |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
for sign in [-1, 1]: |
||||||
|
for t in np.arange(0, DRIVER_TORQUE_ALLOWANCE + 1, 1): |
||||||
|
t *= -sign |
||||||
|
self.safety.set_cadillac_torque_driver(t, t) |
||||||
|
self._set_prev_torque(MAX_TORQUE * sign) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(MAX_TORQUE * sign))) |
||||||
|
|
||||||
|
self.safety.set_cadillac_torque_driver(DRIVER_TORQUE_ALLOWANCE + 1, DRIVER_TORQUE_ALLOWANCE + 1) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg(-MAX_TORQUE))) |
||||||
|
|
||||||
|
# spot check some individual cases |
||||||
|
for sign in [-1, 1]: |
||||||
|
driver_torque = (DRIVER_TORQUE_ALLOWANCE + 10) * sign |
||||||
|
torque_desired = (MAX_TORQUE - 10 * DRIVER_TORQUE_FACTOR) * sign |
||||||
|
delta = 1 * sign |
||||||
|
self._set_prev_torque(torque_desired) |
||||||
|
self.safety.set_cadillac_torque_driver(-driver_torque, -driver_torque) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(torque_desired))) |
||||||
|
self._set_prev_torque(torque_desired + delta) |
||||||
|
self.safety.set_cadillac_torque_driver(-driver_torque, -driver_torque) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg(torque_desired + delta))) |
||||||
|
|
||||||
|
self._set_prev_torque(MAX_TORQUE * sign) |
||||||
|
self.safety.set_cadillac_torque_driver(-MAX_TORQUE * sign, -MAX_TORQUE * sign) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg((MAX_TORQUE - MAX_RATE_DOWN) * sign))) |
||||||
|
self._set_prev_torque(MAX_TORQUE * sign) |
||||||
|
self.safety.set_cadillac_torque_driver(-MAX_TORQUE * sign, -MAX_TORQUE * sign) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(0))) |
||||||
|
self._set_prev_torque(MAX_TORQUE * sign) |
||||||
|
self.safety.set_cadillac_torque_driver(-MAX_TORQUE * sign, -MAX_TORQUE * sign) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg((MAX_TORQUE - MAX_RATE_DOWN + 1) * sign))) |
||||||
|
|
||||||
|
|
||||||
|
def test_realtime_limits(self): |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
for sign in [-1, 1]: |
||||||
|
self.safety.init_tests_cadillac() |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.safety.set_cadillac_torque_driver(0, 0) |
||||||
|
for t in np.arange(0, MAX_RT_DELTA, 1): |
||||||
|
t *= sign |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(t))) |
||||||
|
self.assertFalse(self.safety.cadillac_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA + 1)))) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
for t in np.arange(0, MAX_RT_DELTA, 1): |
||||||
|
t *= sign |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(t))) |
||||||
|
|
||||||
|
# Increase timer to update rt_torque_last |
||||||
|
self.safety.set_timer(RT_INTERVAL + 1) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA - 1)))) |
||||||
|
self.assertTrue(self.safety.cadillac_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA + 1)))) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
unittest.main() |
@ -0,0 +1,270 @@ |
|||||||
|
#!/usr/bin/env python2 |
||||||
|
import unittest |
||||||
|
import numpy as np |
||||||
|
import libpandasafety_py |
||||||
|
|
||||||
|
MAX_RATE_UP = 7 |
||||||
|
MAX_RATE_DOWN = 17 |
||||||
|
MAX_STEER = 255 |
||||||
|
MAX_BRAKE = 350 |
||||||
|
MAX_GAS = 3072 |
||||||
|
MAX_REGEN = 1404 |
||||||
|
|
||||||
|
MAX_RT_DELTA = 128 |
||||||
|
RT_INTERVAL = 250000 |
||||||
|
|
||||||
|
DRIVER_TORQUE_ALLOWANCE = 50; |
||||||
|
DRIVER_TORQUE_FACTOR = 4; |
||||||
|
|
||||||
|
def twos_comp(val, bits): |
||||||
|
if val >= 0: |
||||||
|
return val |
||||||
|
else: |
||||||
|
return (2**bits) + val |
||||||
|
|
||||||
|
def sign(a): |
||||||
|
if a > 0: |
||||||
|
return 1 |
||||||
|
else: |
||||||
|
return -1 |
||||||
|
|
||||||
|
class TestGmSafety(unittest.TestCase): |
||||||
|
@classmethod |
||||||
|
def setUp(cls): |
||||||
|
cls.safety = libpandasafety_py.libpandasafety |
||||||
|
cls.safety.gm_init(0) |
||||||
|
cls.safety.init_tests_gm() |
||||||
|
|
||||||
|
def _speed_msg(self, speed): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 842 << 21 |
||||||
|
to_send[0].RDLR = speed |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _button_msg(self, buttons): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 481 << 21 |
||||||
|
to_send[0].RDHR = buttons << 12 |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _brake_msg(self, brake): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 241 << 21 |
||||||
|
to_send[0].RDLR = 0xa00 if brake else 0x900 |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _gas_msg(self, gas): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 417 << 21 |
||||||
|
to_send[0].RDHR = (1 << 16) if gas else 0 |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _send_brake_msg(self, brake): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 789 << 21 |
||||||
|
brake = (-brake) & 0xfff |
||||||
|
to_send[0].RDLR = (brake >> 8) | ((brake &0xff) << 8) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _send_gas_msg(self, gas): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 715 << 21 |
||||||
|
to_send[0].RDLR = ((gas & 0x1f) << 27) | ((gas & 0xfe0) << 11) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _set_prev_torque(self, t): |
||||||
|
self.safety.set_gm_desired_torque_last(t) |
||||||
|
self.safety.set_gm_rt_torque_last(t) |
||||||
|
|
||||||
|
def _torque_driver_msg(self, torque): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 388 << 21 |
||||||
|
|
||||||
|
t = twos_comp(torque, 11) |
||||||
|
to_send[0].RDHR = (((t >> 8) & 0x7) << 16) | ((t & 0xFF) << 24) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def _torque_driver_msg_array(self, torque): |
||||||
|
for i in range(3): |
||||||
|
self.safety.gm_ipas_rx_hook(self._torque_driver_msg(torque)) |
||||||
|
|
||||||
|
def _torque_msg(self, torque): |
||||||
|
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') |
||||||
|
to_send[0].RIR = 384 << 21 |
||||||
|
|
||||||
|
t = twos_comp(torque, 11) |
||||||
|
to_send[0].RDLR = ((t >> 8) & 0x7) | ((t & 0xFF) << 8) |
||||||
|
return to_send |
||||||
|
|
||||||
|
def test_default_controls_not_allowed(self): |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_resume_button(self): |
||||||
|
RESUME_BTN = 2 |
||||||
|
self.safety.set_controls_allowed(0) |
||||||
|
self.safety.gm_rx_hook(self._button_msg(RESUME_BTN)) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_set_button(self): |
||||||
|
SET_BTN = 3 |
||||||
|
self.safety.set_controls_allowed(0) |
||||||
|
self.safety.gm_rx_hook(self._button_msg(SET_BTN)) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_cancel_button(self): |
||||||
|
CANCEL_BTN = 6 |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.safety.gm_rx_hook(self._button_msg(CANCEL_BTN)) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_disengage_on_brake(self): |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.safety.gm_rx_hook(self._brake_msg(True)) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_allow_brake_at_zero_speed(self): |
||||||
|
# Brake was already pressed |
||||||
|
self.safety.gm_rx_hook(self._brake_msg(True)) |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
|
||||||
|
self.safety.gm_rx_hook(self._brake_msg(True)) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
self.safety.gm_rx_hook(self._brake_msg(False)) |
||||||
|
|
||||||
|
def test_not_allow_brake_when_moving(self): |
||||||
|
# Brake was already pressed |
||||||
|
self.safety.gm_rx_hook(self._brake_msg(True)) |
||||||
|
self.safety.gm_rx_hook(self._speed_msg(100)) |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
|
||||||
|
self.safety.gm_rx_hook(self._brake_msg(True)) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
self.safety.gm_rx_hook(self._brake_msg(False)) |
||||||
|
|
||||||
|
def test_disengage_on_gas(self): |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.safety.gm_rx_hook(self._gas_msg(True)) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
self.safety.gm_rx_hook(self._gas_msg(False)) |
||||||
|
|
||||||
|
def test_allow_engage_with_gas_pressed(self): |
||||||
|
self.safety.gm_rx_hook(self._gas_msg(True)) |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.safety.gm_rx_hook(self._gas_msg(True)) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
self.safety.gm_rx_hook(self._gas_msg(False)) |
||||||
|
|
||||||
|
def test_brake_safety_check(self): |
||||||
|
for enabled in [0, 1]: |
||||||
|
for b in range(0, 500): |
||||||
|
self.safety.set_controls_allowed(enabled) |
||||||
|
if abs(b) > MAX_BRAKE or (not enabled and b != 0): |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._send_brake_msg(b))) |
||||||
|
else: |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._send_brake_msg(b))) |
||||||
|
|
||||||
|
def test_gas_safety_check(self): |
||||||
|
for enabled in [0, 1]: |
||||||
|
for g in range(0, 2**12-1): |
||||||
|
self.safety.set_controls_allowed(enabled) |
||||||
|
if abs(g) > MAX_GAS or (not enabled and g != MAX_REGEN): |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._send_gas_msg(g))) |
||||||
|
else: |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._send_gas_msg(g))) |
||||||
|
|
||||||
|
def test_steer_safety_check(self): |
||||||
|
for enabled in [0, 1]: |
||||||
|
for t in range(-0x200, 0x200): |
||||||
|
self.safety.set_controls_allowed(enabled) |
||||||
|
self._set_prev_torque(t) |
||||||
|
if abs(t) > MAX_STEER or (not enabled and abs(t) > 0): |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(t))) |
||||||
|
else: |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(t))) |
||||||
|
|
||||||
|
def test_manually_enable_controls_allowed(self): |
||||||
|
self.safety.set_controls_allowed(1) |
||||||
|
self.assertTrue(self.safety.get_controls_allowed()) |
||||||
|
self.safety.set_controls_allowed(0) |
||||||
|
self.assertFalse(self.safety.get_controls_allowed()) |
||||||
|
|
||||||
|
def test_non_realtime_limit_up(self): |
||||||
|
self.safety.set_gm_torque_driver(0, 0) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(MAX_RATE_UP))) |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(-MAX_RATE_UP))) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(MAX_RATE_UP + 1))) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(-MAX_RATE_UP - 1))) |
||||||
|
|
||||||
|
def test_non_realtime_limit_down(self): |
||||||
|
self.safety.set_gm_torque_driver(0, 0) |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
def test_against_torque_driver(self): |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
for sign in [-1, 1]: |
||||||
|
for t in np.arange(0, DRIVER_TORQUE_ALLOWANCE + 1, 1): |
||||||
|
t *= -sign |
||||||
|
self.safety.set_gm_torque_driver(t, t) |
||||||
|
self._set_prev_torque(MAX_STEER * sign) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(MAX_STEER * sign))) |
||||||
|
|
||||||
|
self.safety.set_gm_torque_driver(DRIVER_TORQUE_ALLOWANCE + 1, DRIVER_TORQUE_ALLOWANCE + 1) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(-MAX_STEER))) |
||||||
|
|
||||||
|
# spot check some individual cases |
||||||
|
for sign in [-1, 1]: |
||||||
|
driver_torque = (DRIVER_TORQUE_ALLOWANCE + 10) * sign |
||||||
|
torque_desired = (MAX_STEER - 10 * DRIVER_TORQUE_FACTOR) * sign |
||||||
|
delta = 1 * sign |
||||||
|
self._set_prev_torque(torque_desired) |
||||||
|
self.safety.set_gm_torque_driver(-driver_torque, -driver_torque) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(torque_desired))) |
||||||
|
self._set_prev_torque(torque_desired + delta) |
||||||
|
self.safety.set_gm_torque_driver(-driver_torque, -driver_torque) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(torque_desired + delta))) |
||||||
|
|
||||||
|
self._set_prev_torque(MAX_STEER * sign) |
||||||
|
self.safety.set_gm_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg((MAX_STEER - MAX_RATE_DOWN) * sign))) |
||||||
|
self._set_prev_torque(MAX_STEER * sign) |
||||||
|
self.safety.set_gm_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(0))) |
||||||
|
self._set_prev_torque(MAX_STEER * sign) |
||||||
|
self.safety.set_gm_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg((MAX_STEER - MAX_RATE_DOWN + 1) * sign))) |
||||||
|
|
||||||
|
|
||||||
|
def test_realtime_limits(self): |
||||||
|
self.safety.set_controls_allowed(True) |
||||||
|
|
||||||
|
for sign in [-1, 1]: |
||||||
|
self.safety.init_tests_gm() |
||||||
|
self._set_prev_torque(0) |
||||||
|
self.safety.set_gm_torque_driver(0, 0) |
||||||
|
for t in np.arange(0, MAX_RT_DELTA, 1): |
||||||
|
t *= sign |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(t))) |
||||||
|
self.assertFalse(self.safety.gm_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA + 1)))) |
||||||
|
|
||||||
|
self._set_prev_torque(0) |
||||||
|
for t in np.arange(0, MAX_RT_DELTA, 1): |
||||||
|
t *= sign |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(t))) |
||||||
|
|
||||||
|
# Increase timer to update rt_torque_last |
||||||
|
self.safety.set_timer(RT_INTERVAL + 1) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA - 1)))) |
||||||
|
self.assertTrue(self.safety.gm_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA + 1)))) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
unittest.main() |
Loading…
Reference in new issue