diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index 6503b164df..f82da4195f 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -264,6 +264,7 @@ jobs: run: | ${{ env.RUN }} "export SKIP_LONG_TESTS=1 && \ $PYTEST -n auto --dist=loadscope --timeout 30 -o cpp_files=test_* && \ + ./selfdrive/locationd/test/test_locationd_scenarios.py && \ ./selfdrive/ui/tests/create_test_translations.sh && \ QT_QPA_PLATFORM=offscreen ./selfdrive/ui/tests/test_translations && \ ./selfdrive/ui/tests/test_translations.py && \ diff --git a/selfdrive/locationd/test/test_locationd_scenarios.py b/selfdrive/locationd/test/test_locationd_scenarios.py new file mode 100755 index 0000000000..f08fe72ff1 --- /dev/null +++ b/selfdrive/locationd/test/test_locationd_scenarios.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +import unittest +import numpy as np +from collections import defaultdict +from enum import Enum + + +from openpilot.selfdrive.test.openpilotci import get_url +from openpilot.tools.lib.logreader import LogReader +from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name + +TEST_ROUTE, TEST_SEG_NUM = "ff2bd20623fcaeaa|2023-09-05--10-14-54", 4 +GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation'] +SELECT_COMPARE_FIELDS = { + 'yaw_rate': ['angularVelocityCalibrated', 'value', 2], + 'roll': ['orientationNED', 'value', 0], + 'gps_flag': ['gpsOK'], + 'inputs_flag': ['inputsOK'], + 'sensors_flag': ['sensorsOK'], +} +JUNK_IDX = 100 + + +class Scenario(Enum): + BASE = 'base' + GPS_OFF = 'gps_off' + GPS_OFF_MIDWAY = 'gps_off_midway' + GPS_ON_MIDWAY = 'gps_on_midway' + GPS_TUNNEL = 'gps_tunnel' + GYRO_OFF = 'gyro_off' + GYRO_SPIKE_MIDWAY = 'gyro_spike_midway' + ACCEL_OFF = 'accel_off' + ACCEL_SPIKE_MIDWAY = 'accel_spike_midway' + + +def get_select_fields_data(logs): + def get_nested_keys(msg, keys): + val = None + for key in keys: + val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key] + return val + llk = [x.liveLocationKalman for x in logs if x.which() == 'liveLocationKalman'] + data = defaultdict(list) + for msg in llk: + for key, fields in SELECT_COMPARE_FIELDS.items(): + data[key].append(get_nested_keys(msg, fields)) + for key in data: + data[key] = np.array(data[key][JUNK_IDX:], dtype=float) + return data + + +def run_scenarios(scenario): + logs = list(LogReader(get_url(TEST_ROUTE, TEST_SEG_NUM))) + if scenario == Scenario.BASE: + pass + + elif scenario == Scenario.GPS_OFF: + logs = sorted([x for x in logs if x.which() not in GPS_MESSAGES], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.GPS_OFF_MIDWAY: + non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] + gps = [x for x in logs if x.which() in GPS_MESSAGES] + logs = sorted(non_gps + gps[: len(gps) // 2], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.GPS_ON_MIDWAY: + non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] + gps = [x for x in logs if x.which() in GPS_MESSAGES] + logs = sorted(non_gps + gps[len(gps) // 2:], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.GPS_TUNNEL: + non_gps = [x for x in logs if x.which() not in GPS_MESSAGES] + gps = [x for x in logs if x.which() in GPS_MESSAGES] + logs = sorted(non_gps + gps[:len(gps) // 4] + gps[-len(gps) // 4:], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.GYRO_OFF: + logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.GYRO_SPIKE_MIDWAY: + non_gyro = [x for x in logs if x.which() not in 'gyroscope'] + gyro = [x for x in logs if x.which() in 'gyroscope'] + temp = gyro[len(gyro) // 2].as_builder() + temp.gyroscope.gyroUncalibrated.v[0] += 3.0 + gyro[len(gyro) // 2] = temp.as_reader() + logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime) + + elif scenario == Scenario.ACCEL_OFF: + logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime) + + elif scenario == Scenario.ACCEL_SPIKE_MIDWAY: + non_accel = [x for x in logs if x.which() not in 'accelerometer'] + accel = [x for x in logs if x.which() in 'accelerometer'] + temp = accel[len(accel) // 2].as_builder() + temp.accelerometer.acceleration.v[0] += 10.0 + accel[len(accel) // 2] = temp.as_reader() + logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime) + + replayed_logs = replay_process_with_name(name='locationd', lr=logs) + return get_select_fields_data(logs), get_select_fields_data(replayed_logs) + + +class TestLocationdScenarios(unittest.TestCase): + """ + Test locationd with different scenarios. In all these scenarios, we expect the following: + - locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK) + - faulty values should be ignored, with appropriate flags set + """ + def test_base(self): + """ + Test: unchanged log + Expected Result: + - yaw_rate: unchanged + - roll: unchanged + """ + orig_data, replayed_data = run_scenarios(Scenario.BASE) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + + def test_gps_off(self): + """ + Test: no GPS message for the entire segment + Expected Result: + - yaw_rate: unchanged + - roll: + - gpsOK: False + """ + orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + self.assertTrue(np.all(replayed_data['gps_flag'] == 0.0)) + + def test_gps_off_midway(self): + """ + Test: no GPS message for the second half of the segment + Expected Result: + - yaw_rate: unchanged + - roll: + - gpsOK: True for the first half, False for the second half + """ + orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + self.assertTrue(np.diff(replayed_data['gps_flag'])[512] == -1.0) + + def test_gps_on_midway(self): + """ + Test: no GPS message for the first half of the segment + Expected Result: + - yaw_rate: unchanged + - roll: + - gpsOK: False for the first half, True for the second half + """ + orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5))) + self.assertTrue(np.diff(replayed_data['gps_flag'])[505] == 1.0) + + def test_gps_tunnel(self): + """ + Test: no GPS message for the middle section of the segment + Expected Result: + - yaw_rate: unchanged + - roll: + - gpsOK: False for the middle section, True for the rest + """ + orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + self.assertTrue(np.diff(replayed_data['gps_flag'])[213] == -1.0) + self.assertTrue(np.diff(replayed_data['gps_flag'])[805] == 1.0) + + def test_gyro_off(self): + """ + Test: no gyroscope message for the entire segment + Expected Result: + - yaw_rate: 0 + - roll: 0 + - sensorsOK: False + """ + _, replayed_data = run_scenarios(Scenario.GYRO_OFF) + self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0)) + self.assertTrue(np.allclose(replayed_data['roll'], 0.0)) + self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0)) + + def test_gyro_spikes(self): + """ + Test: a gyroscope spike in the middle of the segment + Expected Result: + - yaw_rate: unchanged + - roll: unchanged + - inputsOK: False for some time after the spike, True for the rest + """ + orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + self.assertTrue(np.diff(replayed_data['inputs_flag'])[500] == -1.0) + self.assertTrue(np.diff(replayed_data['inputs_flag'])[694] == 1.0) + + def test_accel_off(self): + """ + Test: no accelerometer message for the entire segment + Expected Result: + - yaw_rate: 0 + - roll: 0 + - sensorsOK: False + """ + _, replayed_data = run_scenarios(Scenario.ACCEL_OFF) + self.assertTrue(np.allclose(replayed_data['yaw_rate'], 0.0)) + self.assertTrue(np.allclose(replayed_data['roll'], 0.0)) + self.assertTrue(np.all(replayed_data['sensors_flag'] == 0.0)) + + def test_accel_spikes(self): + """ + ToDo: + Test: an accelerometer spike in the middle of the segment + Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes. + """ + orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY) + self.assertTrue(np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))) + self.assertTrue(np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))) + + +if __name__ == "__main__": + unittest.main()