openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
8.6 KiB

#!/usr/bin/env python3
import unittest
import numpy as np
from collections import defaultdict
from enum import Enum
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name
TEST_ROUTE, TEST_SEG_NUM = "ff2bd20623fcaeaa|2023-09-05--10-14-54", 4
GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation']
SELECT_COMPARE_FIELDS = {
'yaw_rate': ['angularVelocityCalibrated', 'value', 2],
'roll': ['orientationNED', 'value', 0],
'gps_flag': ['gpsOK'],
'inputs_flag': ['inputsOK'],
'sensors_flag': ['sensorsOK'],
}
JUNK_IDX = 100
class Scenario(Enum):
BASE = 'base'
GPS_OFF = 'gps_off'
GPS_OFF_MIDWAY = 'gps_off_midway'
GPS_ON_MIDWAY = 'gps_on_midway'
GPS_TUNNEL = 'gps_tunnel'
GYRO_OFF = 'gyro_off'
GYRO_SPIKE_MIDWAY = 'gyro_spike_midway'
ACCEL_OFF = 'accel_off'
ACCEL_SPIKE_MIDWAY = 'accel_spike_midway'
def get_select_fields_data(logs):
def get_nested_keys(msg, keys):
val = None
for key in keys:
val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key]
return val
llk = [x.liveLocationKalman for x in logs if x.which() == 'liveLocationKalman']
data = defaultdict(list)
for msg in llk:
for key, fields in SELECT_COMPARE_FIELDS.items():
data[key].append(get_nested_keys(msg, fields))
for key in data:
data[key] = np.array(data[key][JUNK_IDX:], dtype=float)
return data
def run_scenarios(scenario):
logs = list(LogReader(get_url(TEST_ROUTE, TEST_SEG_NUM)))
if scenario == Scenario.BASE:
pass
elif scenario == Scenario.GPS_OFF:
logs = sorted([x for x in logs if x.which() not in GPS_MESSAGES], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GPS_OFF_MIDWAY:
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
gps = [x for x in logs if x.which() in GPS_MESSAGES]
logs = sorted(non_gps + gps[: len(gps) // 2], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GPS_ON_MIDWAY:
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
gps = [x for x in logs if x.which() in GPS_MESSAGES]
logs = sorted(non_gps + gps[len(gps) // 2:], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GPS_TUNNEL:
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
gps = [x for x in logs if x.which() in GPS_MESSAGES]
logs = sorted(non_gps + gps[:len(gps) // 4] + gps[-len(gps) // 4:], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GYRO_OFF:
logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime)
elif scenario == Scenario.GYRO_SPIKE_MIDWAY:
non_gyro = [x for x in logs if x.which() not in 'gyroscope']
gyro = [x for x in logs if x.which() in 'gyroscope']
temp = gyro[len(gyro) // 2].as_builder()
temp.gyroscope.gyroUncalibrated.v[0] += 3.0
gyro[len(gyro) // 2] = temp.as_reader()
logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime)
elif scenario == Scenario.ACCEL_OFF:
logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime)
elif scenario == Scenario.ACCEL_SPIKE_MIDWAY:
non_accel = [x for x in logs if x.which() not in 'accelerometer']
accel = [x for x in logs if x.which() in 'accelerometer']
temp = accel[len(accel) // 2].as_builder()
temp.accelerometer.acceleration.v[0] += 10.0
accel[len(accel) // 2] = temp.as_reader()
logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime)
replayed_logs = replay_process_with_name(name='locationd', lr=logs)
return get_select_fields_data(logs), get_select_fields_data(replayed_logs)
class TestLocationdScenarios(unittest.TestCase):
"""
Test locationd with different scenarios. In all these scenarios, we expect the following:
- locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
- faulty values should be ignored, with appropriate flags set
"""
def test_base(self):
"""
Test: unchanged log
Expected Result:
- yaw_rate: unchanged
- roll: unchanged
"""
orig_data, replayed_data = run_scenarios(Scenario.BASE)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
def test_gps_off(self):
"""
Test: no GPS message for the entire segment
Expected Result:
- yaw_rate: unchanged
- roll:
- gpsOK: False
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.all(replayed_data['gps_flag'] == 0.0))
def test_gps_off_midway(self):
"""
Test: no GPS message for the second half of the segment
Expected Result:
- yaw_rate: unchanged
- roll:
- gpsOK: True for the first half, False for the second half
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[512] == -1.0)
def test_gps_on_midway(self):
"""
Test: no GPS message for the first half of the segment
Expected Result:
- yaw_rate: unchanged
- roll:
- gpsOK: False for the first half, True for the second half
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[505] == 1.0)
def test_gps_tunnel(self):
"""
Test: no GPS message for the middle section of the segment
Expected Result:
- yaw_rate: unchanged
- roll:
- gpsOK: False for the middle section, True for the rest
"""
orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['gps_flag'])[213] == -1.0)
self.assertTrue(np.diff(replayed_data['gps_flag'])[805] == 1.0)
def test_gyro_off(self):
"""
Test: no gyroscope message for the entire segment
Expected Result:
- yaw_rate: 0
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.GYRO_OFF)
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
def test_gyro_spikes(self):
"""
Test: a gyroscope spike in the middle of the segment
Expected Result:
- yaw_rate: unchanged
- roll: unchanged
- inputsOK: False for some time after the spike, True for the rest
"""
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
self.assertTrue(np.diff(replayed_data['inputs_flag'])[500] == -1.0)
self.assertTrue(np.diff(replayed_data['inputs_flag'])[694] == 1.0)
def test_accel_off(self):
"""
Test: no accelerometer message for the entire segment
Expected Result:
- yaw_rate: 0
- roll: 0
- sensorsOK: False
"""
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF)
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0))
self.assertTrue(np.allclose(replayed_data['roll'], 0.0))
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0))
def test_accel_spikes(self):
"""
ToDo:
Test: an accelerometer spike in the middle of the segment
Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
"""
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY)
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2)))
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5)))
if __name__ == "__main__":
unittest.main()