locationd: Unit tests for different scenarios (#29908)
* add different scenarios for locationd
* add locationd scenario tests to unit tests
old-commit-hash: e9edf1300a
test-msgs
parent
af2b9e744d
commit
244a2fc793
2 changed files with 224 additions and 0 deletions
@ -0,0 +1,223 @@ |
||||
#!/usr/bin/env python3 |
||||
import unittest |
||||
import numpy as np |
||||
from collections import defaultdict |
||||
from enum import Enum |
||||
|
||||
|
||||
from openpilot.selfdrive.test.openpilotci import get_url |
||||
from openpilot.tools.lib.logreader import LogReader |
||||
from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name |
||||
|
||||
TEST_ROUTE, TEST_SEG_NUM = "ff2bd20623fcaeaa|2023-09-05--10-14-54", 4 |
||||
GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation'] |
||||
SELECT_COMPARE_FIELDS = { |
||||
'yaw_rate': ['angularVelocityCalibrated', 'value', 2], |
||||
'roll': ['orientationNED', 'value', 0], |
||||
'gps_flag': ['gpsOK'], |
||||
'inputs_flag': ['inputsOK'], |
||||
'sensors_flag': ['sensorsOK'], |
||||
} |
||||
JUNK_IDX = 100 |
||||
|
||||
|
||||
class Scenario(Enum): |
||||
BASE = 'base' |
||||
GPS_OFF = 'gps_off' |
||||
GPS_OFF_MIDWAY = 'gps_off_midway' |
||||
GPS_ON_MIDWAY = 'gps_on_midway' |
||||
GPS_TUNNEL = 'gps_tunnel' |
||||
GYRO_OFF = 'gyro_off' |
||||
GYRO_SPIKE_MIDWAY = 'gyro_spike_midway' |
||||
ACCEL_OFF = 'accel_off' |
||||
ACCEL_SPIKE_MIDWAY = 'accel_spike_midway' |
||||
|
||||
|
||||
def get_select_fields_data(logs): |
||||
def get_nested_keys(msg, keys): |
||||
val = None |
||||
for key in keys: |
||||
val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key] |
||||
return val |
||||
llk = [x.liveLocationKalman for x in logs if x.which() == 'liveLocationKalman'] |
||||
data = defaultdict(list) |
||||
for msg in llk: |
||||
for key, fields in SELECT_COMPARE_FIELDS.items(): |
||||
data[key].append(get_nested_keys(msg, fields)) |
||||
for key in data: |
||||
data[key] = np.array(data[key][JUNK_IDX:], dtype=float) |
||||
return data |
||||
|
||||
|
||||
def run_scenarios(scenario): |
||||
logs = list(LogReader(get_url(TEST_ROUTE, TEST_SEG_NUM))) |
||||
if scenario == Scenario.BASE: |
||||
pass |
||||
|
||||
elif scenario == Scenario.GPS_OFF: |
||||
logs = sorted([x for x in logs if x.which() not in GPS_MESSAGES], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.GPS_OFF_MIDWAY: |
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] |
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES] |
||||
logs = sorted(non_gps + gps[: len(gps) // 2], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.GPS_ON_MIDWAY: |
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] |
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES] |
||||
logs = sorted(non_gps + gps[len(gps) // 2:], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.GPS_TUNNEL: |
||||
non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] |
||||
gps = [x for x in logs if x.which() in GPS_MESSAGES] |
||||
logs = sorted(non_gps + gps[:len(gps) // 4] + gps[-len(gps) // 4:], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.GYRO_OFF: |
||||
logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.GYRO_SPIKE_MIDWAY: |
||||
non_gyro = [x for x in logs if x.which() not in 'gyroscope'] |
||||
gyro = [x for x in logs if x.which() in 'gyroscope'] |
||||
temp = gyro[len(gyro) // 2].as_builder() |
||||
temp.gyroscope.gyroUncalibrated.v[0] += 3.0 |
||||
gyro[len(gyro) // 2] = temp.as_reader() |
||||
logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.ACCEL_OFF: |
||||
logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime) |
||||
|
||||
elif scenario == Scenario.ACCEL_SPIKE_MIDWAY: |
||||
non_accel = [x for x in logs if x.which() not in 'accelerometer'] |
||||
accel = [x for x in logs if x.which() in 'accelerometer'] |
||||
temp = accel[len(accel) // 2].as_builder() |
||||
temp.accelerometer.acceleration.v[0] += 10.0 |
||||
accel[len(accel) // 2] = temp.as_reader() |
||||
logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime) |
||||
|
||||
replayed_logs = replay_process_with_name(name='locationd', lr=logs) |
||||
return get_select_fields_data(logs), get_select_fields_data(replayed_logs) |
||||
|
||||
|
||||
class TestLocationdScenarios(unittest.TestCase): |
||||
""" |
||||
Test locationd with different scenarios. In all these scenarios, we expect the following: |
||||
- locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK) |
||||
- faulty values should be ignored, with appropriate flags set |
||||
""" |
||||
def test_base(self): |
||||
""" |
||||
Test: unchanged log |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: unchanged |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.BASE) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
|
||||
def test_gps_off(self): |
||||
""" |
||||
Test: no GPS message for the entire segment |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: |
||||
- gpsOK: False |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
self.assertTrue(np.all(replayed_data['gps_flag'] == 0.0)) |
||||
|
||||
def test_gps_off_midway(self): |
||||
""" |
||||
Test: no GPS message for the second half of the segment |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: |
||||
- gpsOK: True for the first half, False for the second half |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[512] == -1.0) |
||||
|
||||
def test_gps_on_midway(self): |
||||
""" |
||||
Test: no GPS message for the first half of the segment |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: |
||||
- gpsOK: False for the first half, True for the second half |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5))) |
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[505] == 1.0) |
||||
|
||||
def test_gps_tunnel(self): |
||||
""" |
||||
Test: no GPS message for the middle section of the segment |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: |
||||
- gpsOK: False for the middle section, True for the rest |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[213] == -1.0) |
||||
self.assertTrue(np.diff(replayed_data['gps_flag'])[805] == 1.0) |
||||
|
||||
def test_gyro_off(self): |
||||
""" |
||||
Test: no gyroscope message for the entire segment |
||||
Expected Result: |
||||
- yaw_rate: 0 |
||||
- roll: 0 |
||||
- sensorsOK: False |
||||
""" |
||||
_, replayed_data = run_scenarios(Scenario.GYRO_OFF) |
||||
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0)) |
||||
self.assertTrue(np.allclose(replayed_data['roll'], 0.0)) |
||||
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0)) |
||||
|
||||
def test_gyro_spikes(self): |
||||
""" |
||||
Test: a gyroscope spike in the middle of the segment |
||||
Expected Result: |
||||
- yaw_rate: unchanged |
||||
- roll: unchanged |
||||
- inputsOK: False for some time after the spike, True for the rest |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
self.assertTrue(np.diff(replayed_data['inputs_flag'])[500] == -1.0) |
||||
self.assertTrue(np.diff(replayed_data['inputs_flag'])[694] == 1.0) |
||||
|
||||
def test_accel_off(self): |
||||
""" |
||||
Test: no accelerometer message for the entire segment |
||||
Expected Result: |
||||
- yaw_rate: 0 |
||||
- roll: 0 |
||||
- sensorsOK: False |
||||
""" |
||||
_, replayed_data = run_scenarios(Scenario.ACCEL_OFF) |
||||
self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0)) |
||||
self.assertTrue(np.allclose(replayed_data['roll'], 0.0)) |
||||
self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0)) |
||||
|
||||
def test_accel_spikes(self): |
||||
""" |
||||
ToDo: |
||||
Test: an accelerometer spike in the middle of the segment |
||||
Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes. |
||||
""" |
||||
orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY) |
||||
self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) |
||||
self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
Loading…
Reference in new issue