@ -1,15 +1,13 @@
#!/usr/bin/env python3
import os
import time
import unittest
import numpy as np
from collections import namedtuple
from smbus2 import SMBus
import cereal . messaging as messaging
from cereal import log
from system . hardware import TICI , HARDWARE
from selfdrive . test . helpers import with_processes
from selfdrive . manager . process_config import managed_processes
SENSOR_CONFIGURATIONS = (
@ -50,33 +48,33 @@ SENSOR_CONFIGURATIONS = (
)
Sensor = log . SensorEventData . SensorSource
SensorConfig = namedtuple ( ' SensorConfig ' , [ ' type ' , ' min_samples ' , ' sanity_min' , ' sanity_max ' ] )
SensorConfig = namedtuple ( ' SensorConfig ' , [ ' type ' , ' sanity_min ' , ' sanity_max ' ] )
ALL_SENSORS = {
Sensor . rpr0521 : {
SensorConfig ( " light " , 100 , 0 , 150 ) ,
SensorConfig ( " light " , 0 , 150 ) ,
} ,
Sensor . lsm6ds3 : {
SensorConfig ( " acceleration " , 100 , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 100 , 0 , .2 ) ,
SensorConfig ( " temperature " , 100 , 0 , 60 ) ,
SensorConfig ( " acceleration " , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 0 , .2 ) ,
SensorConfig ( " temperature " , 0 , 60 ) ,
} ,
Sensor . lsm6ds3trc : {
SensorConfig ( " acceleration " , 100 , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 100 , 0 , .2 ) ,
SensorConfig ( " temperature " , 100 , 0 , 60 ) ,
SensorConfig ( " acceleration " , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 0 , .2 ) ,
SensorConfig ( " temperature " , 0 , 60 ) ,
} ,
Sensor . bmx055 : {
SensorConfig ( " acceleration " , 100 , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 100 , 0 , .2 ) ,
SensorConfig ( " magneticUncalibrated " , 100 , 0 , 300 ) ,
SensorConfig ( " temperature " , 100 , 0 , 60 ) ,
SensorConfig ( " acceleration " , 5 , 15 ) ,
SensorConfig ( " gyroUncalibrated " , 0 , .2 ) ,
SensorConfig ( " magneticUncalibrated " , 0 , 300 ) ,
SensorConfig ( " temperature " , 0 , 60 ) ,
} ,
Sensor . mmc5603nj : {
SensorConfig ( " magneticUncalibrated " , 100 , 0 , 300 ) ,
SensorConfig ( " magneticUncalibrated " , 0 , 300 ) ,
}
}
@ -96,7 +94,6 @@ def read_sensor_events(duration_sec):
return events
def get_proc_interrupts ( int_pin ) :
with open ( " /proc/interrupts " ) as f :
lines = f . read ( ) . split ( " \n " )
@ -117,12 +114,18 @@ class TestSensord(unittest.TestCase):
HARDWARE . initialize_hardware ( )
# read initial sensor values every test case can use
os . system ( " pkill -f ./_sensord " )
cls . sample_secs = 5
managed_processes [ " sensord " ] . start ( )
cls . events = read_sensor_events ( 5 )
time . sleep ( 2 )
cls . events = read_sensor_events ( cls . sample_secs )
managed_processes [ " sensord " ] . stop ( )
@classmethod
def tearDownClass ( cls ) :
managed_processes [ " sensord " ] . stop ( )
def tearDown ( self ) :
# interrupt check might leave sensord running
managed_processes [ " sensord " ] . stop ( )
def test_sensors_present ( self ) :
@ -138,34 +141,32 @@ class TestSensord(unittest.TestCase):
self . assertIn ( seen , SENSOR_CONFIGURATIONS )
def test_lsm6ds3_100Hz ( self ) :
# verify measurements are sampled and published at a 100Hz rate
def test_lsm6ds3_timing ( self ) :
# verify measurements are sampled and published at 104Hz
data_points = set ( )
sensor_t = {
1 : [ ] , # accel
5 : [ ] , # gyro
}
for event in self . events :
for measurement in event . sensorEvents :
if str ( measurement . source ) . startswith ( " lsm6ds3 " ) and measurement . sensor in sensor_t :
sensor_t [ measurement . sensor ] . append ( measurement . timestamp )
# skip lsm6ds3 temperature measurements
if measurement . which ( ) == ' temperature ' :
continue
if str ( measurement . source ) . startswith ( " lsm6ds3 " ) :
data_points . add ( measurement . timestamp )
assert len ( data_points ) != 0 , " No lsm6ds3 sensor events "
data_list = list ( data_points )
data_list . sort ( )
tdiffs = np . diff ( data_list )
for s , vals in sensor_t . items ( ) :
with self . subTest ( sensor = s ) :
assert len ( vals ) > 0
tdiffs = np . diff ( vals ) / 1e6 # millis
high_delay_diffs = se t ( filter ( lambda d : d > = 10.1 * 10 * * 6 , tdiffs ) )
assert len ( high_delay_diffs ) < 10 , f " Too many high delay package s: { high_delay_diffs } "
high_delay_diffs = list ( filter ( lambda d : d > = 20. , tdiffs ) )
assert len ( high_delay_diffs ) < 15 , f " Too many large diffs: { high_delay_diffs } "
# 100-108Hz, expected 104Hz
avg_diff = sum ( tdiffs ) / len ( tdiffs )
assert avg_diff > 9.6 * 10 * * 6 , f " avg difference { avg_diff } , below threshold "
assert 9.3 < avg_diff < 10. , f " avg difference { avg_diff } , below threshold "
stddev = np . std ( tdiffs )
assert stddev < 1.5 * 10 * * 6 , f " Standard-dev to big { stddev } "
assert stddev < 2.0 , f " Standard-dev to big { stddev } "
def test_events_check ( self ) :
# verify if all sensors produce events
@ -217,13 +218,9 @@ class TestSensord(unittest.TestCase):
stddev = np . std ( tdiffs )
assert stddev < 2 * 10 * * 6 , f " Timing diffs have to high stddev: { stddev } "
@with_processes ( [ ' sensord ' ] )
def test_sensor_values_sanity_check ( self ) :
events = read_sensor_events ( 2 )
sensor_values = dict ( )
for event in events :
for event in self . events :
for m in event . sensorEvents :
# filter unset events (bmx magn)
@ -250,8 +247,9 @@ class TestSensord(unittest.TestCase):
key = ( sensor , s . type )
val_cnt = len ( sensor_values [ key ] )
err_msg = f " Sensor { sensor } { s . type } got { val_cnt } measurements, expected { s . min_samples } "
assert val_cnt > s . min_samples , err_msg
min_samples = self . sample_secs * 100 # Hz
err_msg = f " Sensor { sensor } { s . type } got { val_cnt } measurements, expected { min_samples } "
assert min_samples * 0.9 < val_cnt < min_samples * 1.1 , err_msg
mean_norm = np . mean ( np . linalg . norm ( sensor_values [ key ] , axis = 1 ) )
err_msg = f " Sensor ' { sensor } { s . type } ' failed sanity checks { mean_norm } is not between { s . sanity_min } and { s . sanity_max } "
@ -260,26 +258,17 @@ class TestSensord(unittest.TestCase):
def test_sensor_verify_no_interrupts_after_stop ( self ) :
managed_processes [ " sensord " ] . start ( )
time . sleep ( 1 )
# check if the interrupts are enableds
with SMBus ( SENSOR_BUS , force = True ) as bus :
int1_ctrl_reg = bus . read_byte_data ( I2C_ADDR_LSM , 0x0D )
assert int1_ctrl_reg == 3 , " Interrupts not enabled! "
time . sleep ( 3 )
# read /proc/interrupts to verify interrupts are received
state_one = get_proc_interrupts ( LSM_INT_GPIO )
time . sleep ( 1 )
state_two = get_proc_interrupts ( LSM_INT_GPIO )
assert state_one != state_two , " no Interrupts received after sensord start! "
assert state_one != state_two , f " no interrupts received after sensord start! \n { state_one } { state_two } "
managed_processes [ " sensord " ] . stop ( )
# check if the interrupts got disabled
with SMBus ( SENSOR_BUS , force = True ) as bus :
int1_ctrl_reg = bus . read_byte_data ( I2C_ADDR_LSM , 0x0D )
assert int1_ctrl_reg == 0 , " Interrupts not disabled! "
time . sleep ( 1 )
# read /proc/interrupts to verify no more interrupts are received
state_one = get_proc_interrupts ( LSM_INT_GPIO )