openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

289 lines
8.6 KiB

#!/usr/bin/env python3
import time
import unittest
import numpy as np
from collections import namedtuple
from smbus2 import SMBus
import cereal.messaging as messaging
from cereal import log
from system.hardware import TICI
from selfdrive.test.helpers import with_processes
from selfdrive.manager.process_config import managed_processes
SENSOR_CONFIGURATIONS = (
{
('bmx055', 'acceleration'),
('bmx055', 'gyroUncalibrated'),
('bmx055', 'magneticUncalibrated'),
('bmx055', 'temperature'),
('lsm6ds3', 'acceleration'),
('lsm6ds3', 'gyroUncalibrated'),
('lsm6ds3', 'temperature'),
('rpr0521', 'light'),
},
{
('lsm6ds3', 'acceleration'),
('lsm6ds3', 'gyroUncalibrated'),
('lsm6ds3', 'temperature'),
('mmc5603nj', 'magneticUncalibrated'),
('rpr0521', 'light'),
},
{
('bmx055', 'acceleration'),
('bmx055', 'gyroUncalibrated'),
('bmx055', 'magneticUncalibrated'),
('bmx055', 'temperature'),
('lsm6ds3trc', 'acceleration'),
('lsm6ds3trc', 'gyroUncalibrated'),
('lsm6ds3trc', 'temperature'),
('rpr0521', 'light'),
},
{
('lsm6ds3trc', 'acceleration'),
('lsm6ds3trc', 'gyroUncalibrated'),
('lsm6ds3trc', 'temperature'),
('mmc5603nj', 'magneticUncalibrated'),
('rpr0521', 'light'),
},
)
Sensor = log.SensorEventData.SensorSource
SensorConfig = namedtuple('SensorConfig', ['type', 'min_samples', 'sanity_min', 'sanity_max'])
ALL_SENSORS = {
Sensor.rpr0521: {
SensorConfig("light", 100, 0, 150),
},
Sensor.lsm6ds3: {
SensorConfig("acceleration", 100, 5, 15),
SensorConfig("gyroUncalibrated", 100, 0, .2),
SensorConfig("temperature", 100, 0, 60),
},
Sensor.lsm6ds3trc: {
SensorConfig("acceleration", 100, 5, 15),
SensorConfig("gyroUncalibrated", 100, 0, .2),
SensorConfig("temperature", 100, 0, 60),
},
Sensor.bmx055: {
SensorConfig("acceleration", 100, 5, 15),
SensorConfig("gyroUncalibrated", 100, 0, .2),
SensorConfig("magneticUncalibrated", 100, 0, 300),
SensorConfig("temperature", 100, 0, 60),
},
Sensor.mmc5603nj: {
SensorConfig("magneticUncalibrated", 100, 0, 300),
}
}
SENSOR_BUS = 1
I2C_ADDR_LSM = 0x6A
LSM_INT_GPIO = 84
def read_sensor_events(duration_sec):
sensor_events = messaging.sub_sock("sensorEvents", timeout=0.1)
start_time_sec = time.monotonic()
events = []
while time.monotonic() - start_time_sec < duration_sec:
events += messaging.drain_sock(sensor_events)
time.sleep(0.01)
assert len(events) != 0, "No sensor events collected"
return events
def get_proc_interrupts(int_pin):
with open("/proc/interrupts") as f:
lines = f.read().split("\n")
for line in lines:
if f" {int_pin} " in line:
return ''.join(list(filter(lambda e: e != '', line.split(' ')))[1:6])
return ""
class TestSensord(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
@with_processes(['sensord'])
def test_sensors_present(self):
# verify correct sensors configuration
events = read_sensor_events(10)
seen = set()
for event in events:
for measurement in event.sensorEvents:
# filter unset events (bmx magn)
if measurement.version == 0:
continue
seen.add((str(measurement.source), measurement.which()))
self.assertIn(seen, SENSOR_CONFIGURATIONS)
@with_processes(['sensord'])
def test_lsm6ds3_100Hz(self):
# verify measurements are sampled and published at a 100Hz rate
events = read_sensor_events(3) # 3sec (about 300 measurements)
data_points = set()
for event in events:
for measurement in event.sensorEvents:
# skip lsm6ds3 temperature measurements
if measurement.which() == 'temperature':
continue
if str(measurement.source).startswith("lsm6ds3"):
data_points.add(measurement.timestamp)
assert len(data_points) != 0, "No lsm6ds3 sensor events"
data_list = list(data_points)
data_list.sort()
tdiffs = np.diff(data_list)
high_delay_diffs = set(filter(lambda d: d >= 10.1*10**6, tdiffs))
assert len(high_delay_diffs) < 10, f"Too many high delay packages: {high_delay_diffs}"
avg_diff = sum(tdiffs)/len(tdiffs)
assert avg_diff > 9.6*10**6, f"avg difference {avg_diff}, below threshold"
stddev = np.std(tdiffs)
assert stddev < 1.5*10**6, f"Standard-dev to big {stddev}"
@with_processes(['sensord'])
def test_events_check(self):
# verify if all sensors produce events
events = read_sensor_events(3)
sensor_events = dict()
for event in events:
for measurement in event.sensorEvents:
# filter unset events (bmx magn)
if measurement.version == 0:
continue
if measurement.type in sensor_events:
sensor_events[measurement.type] += 1
else:
sensor_events[measurement.type] = 1
for s in sensor_events:
err_msg = f"Sensor {s}: 200 < {sensor_events[s]}"
assert sensor_events[s] > 200, err_msg
@with_processes(['sensord'])
def test_logmonottime_timestamp_diff(self):
# ensure diff between the message logMonotime and sample timestamp is small
events = read_sensor_events(3)
tdiffs = list()
for event in events:
for measurement in event.sensorEvents:
# filter unset events (bmx magn)
if measurement.version == 0:
continue
# check if gyro and accel timestamps are before logMonoTime
if str(measurement.source).startswith("lsm6ds3"):
if measurement.which() != 'temperature':
err_msg = f"Timestamp after logMonoTime: {measurement.timestamp} > {event.logMonoTime}"
assert measurement.timestamp < event.logMonoTime, err_msg
# negative values might occur, as non interrupt packages created
# before the sensor is read
tdiffs.append(abs(event.logMonoTime - measurement.timestamp))
high_delay_diffs = set(filter(lambda d: d >= 10*10**6, tdiffs))
assert len(high_delay_diffs) < 15, f"Too many high delay packages: {high_delay_diffs}"
avg_diff = round(sum(tdiffs)/len(tdiffs), 4)
assert avg_diff < 4*10**6, f"Avg packet diff: {avg_diff:.1f}ns"
stddev = np.std(tdiffs)
assert stddev < 2*10**6, f"Timing diffs have to high stddev: {stddev}"
@with_processes(['sensord'])
def test_sensor_values_sanity_check(self):
events = read_sensor_events(2)
sensor_values = dict()
for event in events:
for m in event.sensorEvents:
# filter unset events (bmx magn)
if m.version == 0:
continue
key = (m.source.raw, m.which())
values = getattr(m, m.which())
if hasattr(values, 'v'):
values = values.v
values = np.atleast_1d(values)
if key in sensor_values:
sensor_values[key].append(values)
else:
sensor_values[key] = [values]
# Sanity check sensor values and counts
for sensor, stype in sensor_values:
for s in ALL_SENSORS[sensor]:
if s.type != stype:
continue
key = (sensor, s.type)
val_cnt = len(sensor_values[key])
err_msg = f"Sensor {sensor} {s.type} got {val_cnt} measurements, expected {s.min_samples}"
assert val_cnt > s.min_samples, err_msg
mean_norm = np.mean(np.linalg.norm(sensor_values[key], axis=1))
err_msg = f"Sensor '{sensor} {s.type}' failed sanity checks {mean_norm} is not between {s.sanity_min} and {s.sanity_max}"
assert s.sanity_min <= mean_norm <= s.sanity_max, err_msg
def test_sensor_verify_no_interrupts_after_stop(self):
managed_processes["sensord"].start()
time.sleep(1)
# check if the interrupts are enableds
with SMBus(SENSOR_BUS, force=True) as bus:
int1_ctrl_reg = bus.read_byte_data(I2C_ADDR_LSM, 0x0D)
assert int1_ctrl_reg == 3, "Interrupts not enabled!"
# read /proc/interrupts to verify interrupts are received
state_one = get_proc_interrupts(LSM_INT_GPIO)
time.sleep(1)
state_two = get_proc_interrupts(LSM_INT_GPIO)
assert state_one != state_two, "no Interrupts received after sensord start!"
managed_processes["sensord"].stop()
# check if the interrupts got disabled
with SMBus(SENSOR_BUS, force=True) as bus:
int1_ctrl_reg = bus.read_byte_data(I2C_ADDR_LSM, 0x0D)
assert int1_ctrl_reg == 0, "Interrupts not disabled!"
# read /proc/interrupts to verify no more interrupts are received
state_one = get_proc_interrupts(LSM_INT_GPIO)
time.sleep(1)
state_two = get_proc_interrupts(LSM_INT_GPIO)
assert state_one == state_two, "Interrupts received after sensord stop!"
if __name__ == "__main__":
unittest.main()