|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import pytest
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | from collections import defaultdict
 | 
					
						
							|  |  |  | from enum import Enum
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from openpilot.tools.lib.logreader import LogReader
 | 
					
						
							|  |  |  | from openpilot.selfdrive.test.process_replay.migration import migrate_all
 | 
					
						
							|  |  |  | from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | TEST_ROUTE = "ff2bd20623fcaeaa|2023-09-05--10-14-54/4"
 | 
					
						
							|  |  |  | GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation']
 | 
					
						
							|  |  |  | SELECT_COMPARE_FIELDS = {
 | 
					
						
							|  |  |  |   'yaw_rate': ['angularVelocityCalibrated', 'value', 2],
 | 
					
						
							|  |  |  |   'roll': ['orientationNED', 'value', 0],
 | 
					
						
							|  |  |  |   'gps_flag': ['gpsOK'],
 | 
					
						
							|  |  |  |   'inputs_flag': ['inputsOK'],
 | 
					
						
							|  |  |  |   'sensors_flag': ['sensorsOK'],
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | JUNK_IDX = 100
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Scenario(Enum):
 | 
					
						
							|  |  |  |   BASE = 'base'
 | 
					
						
							|  |  |  |   GPS_OFF = 'gps_off'
 | 
					
						
							|  |  |  |   GPS_OFF_MIDWAY = 'gps_off_midway'
 | 
					
						
							|  |  |  |   GPS_ON_MIDWAY = 'gps_on_midway'
 | 
					
						
							|  |  |  |   GPS_TUNNEL = 'gps_tunnel'
 | 
					
						
							|  |  |  |   GYRO_OFF = 'gyro_off'
 | 
					
						
							|  |  |  |   GYRO_SPIKE_MIDWAY = 'gyro_spike_midway'
 | 
					
						
							|  |  |  |   ACCEL_OFF = 'accel_off'
 | 
					
						
							|  |  |  |   ACCEL_SPIKE_MIDWAY = 'accel_spike_midway'
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_select_fields_data(logs):
 | 
					
						
							|  |  |  |   def get_nested_keys(msg, keys):
 | 
					
						
							|  |  |  |     val = None
 | 
					
						
							|  |  |  |     for key in keys:
 | 
					
						
							|  |  |  |       val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key]
 | 
					
						
							|  |  |  |     return val
 | 
					
						
							|  |  |  |   llk = [x.liveLocationKalman for x in logs if x.which() == 'liveLocationKalman']
 | 
					
						
							|  |  |  |   data = defaultdict(list)
 | 
					
						
							|  |  |  |   for msg in llk:
 | 
					
						
							|  |  |  |     for key, fields in SELECT_COMPARE_FIELDS.items():
 | 
					
						
							|  |  |  |       data[key].append(get_nested_keys(msg, fields))
 | 
					
						
							|  |  |  |   for key in data:
 | 
					
						
							|  |  |  |     data[key] = np.array(data[key][JUNK_IDX:], dtype=float)
 | 
					
						
							|  |  |  |   return data
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def run_scenarios(scenario, logs):
 | 
					
						
							|  |  |  |   if scenario == Scenario.BASE:
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GPS_OFF:
 | 
					
						
							|  |  |  |     logs = sorted([x for x in logs if x.which() not in GPS_MESSAGES], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GPS_OFF_MIDWAY:
 | 
					
						
							|  |  |  |     non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     gps = [x for x in logs if x.which() in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     logs = sorted(non_gps + gps[: len(gps) // 2], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GPS_ON_MIDWAY:
 | 
					
						
							|  |  |  |     non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     gps = [x for x in logs if x.which() in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     logs = sorted(non_gps + gps[len(gps) // 2:], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GPS_TUNNEL:
 | 
					
						
							|  |  |  |     non_gps = [x for x in logs if x.which() not in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     gps = [x for x in logs if x.which() in GPS_MESSAGES]
 | 
					
						
							|  |  |  |     logs = sorted(non_gps + gps[:len(gps) // 4] + gps[-len(gps) // 4:], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GYRO_OFF:
 | 
					
						
							|  |  |  |     logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.GYRO_SPIKE_MIDWAY:
 | 
					
						
							|  |  |  |     non_gyro = [x for x in logs if x.which() not in 'gyroscope']
 | 
					
						
							|  |  |  |     gyro = [x for x in logs if x.which() in 'gyroscope']
 | 
					
						
							|  |  |  |     temp = gyro[len(gyro) // 2].as_builder()
 | 
					
						
							|  |  |  |     temp.gyroscope.gyroUncalibrated.v[0] += 3.0
 | 
					
						
							|  |  |  |     gyro[len(gyro) // 2] = temp.as_reader()
 | 
					
						
							|  |  |  |     logs = sorted(non_gyro + gyro, key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.ACCEL_OFF:
 | 
					
						
							|  |  |  |     logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   elif scenario == Scenario.ACCEL_SPIKE_MIDWAY:
 | 
					
						
							|  |  |  |     non_accel = [x for x in logs if x.which() not in 'accelerometer']
 | 
					
						
							|  |  |  |     accel = [x for x in logs if x.which() in 'accelerometer']
 | 
					
						
							|  |  |  |     temp = accel[len(accel) // 2].as_builder()
 | 
					
						
							|  |  |  |     temp.accelerometer.acceleration.v[0] += 10.0
 | 
					
						
							|  |  |  |     accel[len(accel) // 2] = temp.as_reader()
 | 
					
						
							|  |  |  |     logs = sorted(non_accel + accel, key=lambda x: x.logMonoTime)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   replayed_logs = replay_process_with_name(name='locationd', lr=logs)
 | 
					
						
							|  |  |  |   return get_select_fields_data(logs), get_select_fields_data(replayed_logs)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.xdist_group("test_locationd_scenarios")
 | 
					
						
							|  |  |  | @pytest.mark.shared_download_cache
 | 
					
						
							|  |  |  | class TestLocationdScenarios:
 | 
					
						
							|  |  |  |   """
 | 
					
						
							|  |  |  |   Test locationd with different scenarios. In all these scenarios, we expect the following:
 | 
					
						
							|  |  |  |     - locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
 | 
					
						
							|  |  |  |     - faulty values should be ignored, with appropriate flags set
 | 
					
						
							|  |  |  |   """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @classmethod
 | 
					
						
							|  |  |  |   def setup_class(cls):
 | 
					
						
							|  |  |  |     cls.logs = migrate_all(LogReader(TEST_ROUTE))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_base(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: unchanged log
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll: unchanged
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.BASE, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gps_off(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no GPS message for the entire segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll:
 | 
					
						
							|  |  |  |       - gpsOK: False
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 | 
					
						
							|  |  |  |     assert np.all(replayed_data['gps_flag'] == 0.0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gps_off_midway(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no GPS message for the second half of the segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll:
 | 
					
						
							|  |  |  |       - gpsOK: True for the first half, False for the second half
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.GPS_OFF_MIDWAY, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['gps_flag'])[512] == -1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gps_on_midway(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no GPS message for the first half of the segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll:
 | 
					
						
							|  |  |  |       - gpsOK: False for the first half, True for the second half
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.GPS_ON_MIDWAY, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(1.5))
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['gps_flag'])[505] == 1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gps_tunnel(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no GPS message for the middle section of the segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll:
 | 
					
						
							|  |  |  |       - gpsOK: False for the middle section, True for the rest
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.GPS_TUNNEL, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['gps_flag'])[213] == -1.0
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['gps_flag'])[805] == 1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gyro_off(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no gyroscope message for the entire segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: 0
 | 
					
						
							|  |  |  |       - roll: 0
 | 
					
						
							|  |  |  |       - sensorsOK: False
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     _, replayed_data = run_scenarios(Scenario.GYRO_OFF, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(replayed_data['yaw_rate'], 0.0)
 | 
					
						
							|  |  |  |     assert np.allclose(replayed_data['roll'], 0.0)
 | 
					
						
							|  |  |  |     assert np.all(replayed_data['sensors_flag'] == 0.0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_gyro_spikes(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: a gyroscope spike in the middle of the segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: unchanged
 | 
					
						
							|  |  |  |       - roll: unchanged
 | 
					
						
							|  |  |  |       - inputsOK: False for some time after the spike, True for the rest
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['inputs_flag'])[500] == -1.0
 | 
					
						
							|  |  |  |     assert np.diff(replayed_data['inputs_flag'])[694] == 1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_accel_off(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Test: no accelerometer message for the entire segment
 | 
					
						
							|  |  |  |     Expected Result:
 | 
					
						
							|  |  |  |       - yaw_rate: 0
 | 
					
						
							|  |  |  |       - roll: 0
 | 
					
						
							|  |  |  |       - sensorsOK: False
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     _, replayed_data = run_scenarios(Scenario.ACCEL_OFF, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(replayed_data['yaw_rate'], 0.0)
 | 
					
						
							|  |  |  |     assert np.allclose(replayed_data['roll'], 0.0)
 | 
					
						
							|  |  |  |     assert np.all(replayed_data['sensors_flag'] == 0.0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_accel_spikes(self):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     ToDo:
 | 
					
						
							|  |  |  |     Test: an accelerometer spike in the middle of the segment
 | 
					
						
							|  |  |  |     Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs)
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.2))
 | 
					
						
							|  |  |  |     assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.5))
 |