openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

190 lines
7.5 KiB

import numpy as np
from collections import defaultdict
from enum import Enum
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.process_replay.migration import migrate_all
from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name
# TODO find a new segment to test
TEST_ROUTE = "4019fff6e54cf1c7|00000123--4bc0d95ef6/5"
GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation']
SELECT_COMPARE_FIELDS = {
'yaw_rate': ['angularVelocityDevice', 'z'],
'roll': ['orientationNED', 'x'],
'inputs_flag': ['inputsOK'],
'sensors_flag': ['sensorsOK'],
}
JUNK_IDX = 100
CONSISTENT_SPIKES_COUNT = 10
class Scenario(Enum):
BASE = 'base'
GYRO_OFF = 'gyro_off'
GYRO_SPIKE_MIDWAY = 'gyro_spike_midway'
GYRO_CONSISTENT_SPIKES = 'gyro_consistent_spikes'
ACCEL_OFF = 'accel_off'
ACCEL_SPIKE_MIDWAY = 'accel_spike_midway'
ACCEL_CONSISTENT_SPIKES = 'accel_consistent_spikes'
SENSOR_TIMING_SPIKE_MIDWAY = 'timing_spikes'
SENSOR_TIMING_CONSISTENT_SPIKES = 'timing_consistent_spikes'
def get_select_fields_data(logs):
def get_nested_keys(msg, keys):
val = None
for key in keys:
val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key]
return val
lp = [x.livePose for x in logs if x.which() == 'livePose']
data = defaultdict(list)
for msg in lp:
for key, fields in SELECT_COMPARE_FIELDS.items():
data[key].append(get_nested_keys(msg, fields))
for key in data:
data[key] = np.array(data[key][JUNK_IDX:], dtype=float)
return data
def modify_logs_midway(logs, which, count, fn):
non_which = [x for x in logs if x.which() != which]
which = [x for x in logs if x.which() == which]
temps = which[len(which) // 2:len(which) // 2 + count]
for i, temp in enumerate(temps):
temp = temp.as_builder()
fn(temp)
which[len(which) // 2 + i] = temp.as_reader()
return sorted(non_which + which, key=lambda x: x.logMonoTime)
def run_scenarios(scenario, logs):
if scenario == Scenario.BASE:
pass
elif scenario == Scenario.GYRO_OFF:
logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GYRO_SPIKE_MIDWAY or scenario == Scenario.GYRO_CONSISTENT_SPIKES:
def gyro_spike(msg):
msg.gyroscope.gyroUncalibrated.v[0] += 3.0
count = 1 if scenario == Scenario.GYRO_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT
logs = modify_logs_midway(logs, 'gyroscope', count, gyro_spike)
elif scenario == Scenario.ACCEL_OFF:
logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime)
elif scenario == Scenario.ACCEL_SPIKE_MIDWAY or scenario == Scenario.ACCEL_CONSISTENT_SPIKES:
def acc_spike(msg):
msg.accelerometer.acceleration.v[0] += 100.0
count = 1 if scenario == Scenario.ACCEL_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT
logs = modify_logs_midway(logs, 'accelerometer', count, acc_spike)
elif scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY or scenario == Scenario.SENSOR_TIMING_CONSISTENT_SPIKES:
def timing_spike(msg):
msg.accelerometer.timestamp -= int(0.150 * 1e9)
count = 1 if scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT
logs = modify_logs_midway(logs, 'accelerometer', count, timing_spike)
replayed_logs = replay_process_with_name(name='locationd', lr=logs)
return get_select_fields_data(logs), get_select_fields_data(replayed_logs)
class TestLocationdScenarios:
"""
Test locationd with different scenarios. In all these scenarios, we expect the following:
- locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
- faulty values should be ignored, with appropriate flags set
"""
@classmethod
def setup_class(cls):
cls.logs = migrate_all(LogReader(TEST_ROUTE))
def test_base(self):
"""
Test: unchanged log
Expected Result:
- yaw_rate: unchanged
- roll: unchanged
"""
orig_data, replayed_data = run_scenarios(Scenario.BASE, self.logs)
assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))
def test_gyro_off(self):
"""
Test: no gyroscope message for the entire segment
Expected Result:
- yaw_rate: 0
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.GYRO_OFF, self.logs)
assert np.allclose(replayed_data['yaw_rate'], 0.0)
assert np.allclose(replayed_data['roll'], 0.0)
assert np.all(replayed_data['sensors_flag'] == 0.0)
def test_gyro_spike(self):
"""
Test: a gyroscope spike in the middle of the segment
Expected Result:
- yaw_rate: unchanged
- roll: unchanged
- inputsOK: False for some time after the spike, True for the rest
"""
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY, self.logs)
assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))
assert np.all(replayed_data['inputs_flag'] == orig_data['inputs_flag'])
assert np.all(replayed_data['sensors_flag'] == orig_data['sensors_flag'])
def test_consistent_gyro_spikes(self):
"""
Test: consistent timing spikes for N gyroscope messages in the middle of the segment
Expected Result: inputsOK becomes False after N of bad measurements
"""
orig_data, replayed_data = run_scenarios(Scenario.GYRO_CONSISTENT_SPIKES, self.logs)
assert np.diff(replayed_data['inputs_flag'])[501] == -1.0
assert np.diff(replayed_data['inputs_flag'])[708] == 1.0
def test_accel_off(self):
"""
Test: no accelerometer message for the entire segment
Expected Result:
- yaw_rate: 0
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF, self.logs)
assert np.allclose(replayed_data['yaw_rate'], 0.0)
assert np.allclose(replayed_data['roll'], 0.0)
assert np.all(replayed_data['sensors_flag'] == 0.0)
def test_accel_spike(self):
"""
ToDo:
Test: an accelerometer spike in the middle of the segment
Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
"""
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs)
assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))
def test_single_timing_spike(self):
"""
Test: timing of 150ms off for the single accelerometer message in the middle of the segment
Expected Result: the message is ignored, and inputsOK is False for that time
"""
orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_SPIKE_MIDWAY, self.logs)
assert np.all(replayed_data['inputs_flag'] == orig_data['inputs_flag'])
assert np.all(replayed_data['sensors_flag'] == orig_data['sensors_flag'])
def test_consistent_timing_spikes(self):
"""
Test: consistent timing spikes for N accelerometer messages in the middle of the segment
Expected Result: inputsOK becomes False after N of bad measurements
"""
orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_CONSISTENT_SPIKES, self.logs)
assert np.diff(replayed_data['inputs_flag'])[501] == -1.0
assert np.diff(replayed_data['inputs_flag'])[707] == 1.0