You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							232 lines
						
					
					
						
							7.4 KiB
						
					
					
				
			
		
		
	
	
							232 lines
						
					
					
						
							7.4 KiB
						
					
					
				| import os
 | |
| import pytest
 | |
| import time
 | |
| import numpy as np
 | |
| from collections import namedtuple, defaultdict
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from cereal import log
 | |
| from cereal.services import SERVICE_LIST
 | |
| from openpilot.common.gpio import get_irqs_for_action
 | |
| from openpilot.common.timeout import Timeout
 | |
| from openpilot.system.hardware import HARDWARE
 | |
| from openpilot.system.manager.process_config import managed_processes
 | |
| 
 | |
| LSM = {
 | |
|   ('lsm6ds3', 'acceleration'),
 | |
|   ('lsm6ds3', 'gyroUncalibrated'),
 | |
|   ('lsm6ds3', 'temperature'),
 | |
| }
 | |
| LSM_C = {(x[0]+'trc', x[1]) for x in LSM}
 | |
| 
 | |
| MMC = {
 | |
|   ('mmc5603nj', 'magneticUncalibrated'),
 | |
| }
 | |
| 
 | |
| SENSOR_CONFIGURATIONS: list[set] = {
 | |
|   "mici": [LSM, LSM_C],
 | |
|   "tizi": [MMC | LSM, MMC | LSM_C],
 | |
|   "tici": [LSM, LSM_C, MMC | LSM, MMC | LSM_C],
 | |
| }.get(HARDWARE.get_device_type(), [])
 | |
| 
 | |
| Sensor = log.SensorEventData.SensorSource
 | |
| SensorConfig = namedtuple('SensorConfig', ['type', 'sanity_min', 'sanity_max'])
 | |
| ALL_SENSORS = {
 | |
|   Sensor.lsm6ds3: {
 | |
|     SensorConfig("acceleration", 5, 15),
 | |
|     SensorConfig("gyroUncalibrated", 0, .2),
 | |
|     SensorConfig("temperature", 0, 60),
 | |
|   },
 | |
| 
 | |
|   Sensor.lsm6ds3trc: {
 | |
|     SensorConfig("acceleration", 5, 15),
 | |
|     SensorConfig("gyroUncalibrated", 0, .2),
 | |
|     SensorConfig("temperature", 0, 60),
 | |
|   },
 | |
| 
 | |
|   Sensor.mmc5603nj: {
 | |
|     SensorConfig("magneticUncalibrated", 0, 300),
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| def get_irq_count(irq: int):
 | |
|   with open(f"/sys/kernel/irq/{irq}/per_cpu_count") as f:
 | |
|     per_cpu = map(int, f.read().split(","))
 | |
|     return sum(per_cpu)
 | |
| 
 | |
| def read_sensor_events(duration_sec):
 | |
|   sensor_types = ['accelerometer', 'gyroscope', 'magnetometer', 'accelerometer2',
 | |
|                   'gyroscope2', 'temperatureSensor', 'temperatureSensor2']
 | |
|   socks = {}
 | |
|   poller = messaging.Poller()
 | |
|   events = defaultdict(list)
 | |
|   for stype in sensor_types:
 | |
|     socks[stype] = messaging.sub_sock(stype, poller=poller, timeout=100)
 | |
| 
 | |
|   # wait for sensors to come up
 | |
|   with Timeout(int(os.environ.get("SENSOR_WAIT", "5")), "sensors didn't come up"):
 | |
|     while len(poller.poll(250)) == 0:
 | |
|       pass
 | |
|   time.sleep(1)
 | |
|   for s in socks.values():
 | |
|     messaging.drain_sock_raw(s)
 | |
| 
 | |
|   st = time.monotonic()
 | |
|   while time.monotonic() - st < duration_sec:
 | |
|     for s in socks:
 | |
|       events[s] += messaging.drain_sock(socks[s])
 | |
|     time.sleep(0.1)
 | |
| 
 | |
|   assert sum(map(len, events.values())) != 0, "No sensor events collected!"
 | |
| 
 | |
|   return {k: v for k, v in events.items() if len(v) > 0}
 | |
| 
 | |
| @pytest.mark.tici
 | |
| class TestSensord:
 | |
|   @classmethod
 | |
|   def setup_class(cls):
 | |
|     # enable LSM self test
 | |
|     os.environ["LSM_SELF_TEST"] = "1"
 | |
| 
 | |
|     # read initial sensor values every test case can use
 | |
|     os.system("pkill -f \\\\./sensord")
 | |
|     try:
 | |
|       managed_processes["sensord"].start()
 | |
|       cls.sample_secs = int(os.getenv("SAMPLE_SECS", "10"))
 | |
|       cls.events = read_sensor_events(cls.sample_secs)
 | |
| 
 | |
|       # determine sensord's irq
 | |
|       cls.sensord_irq = get_irqs_for_action("sensord")[0]
 | |
|     finally:
 | |
|       # teardown won't run if this doesn't succeed
 | |
|       managed_processes["sensord"].stop()
 | |
| 
 | |
|   @classmethod
 | |
|   def teardown_class(cls):
 | |
|     managed_processes["sensord"].stop()
 | |
| 
 | |
|   def teardown_method(self):
 | |
|     managed_processes["sensord"].stop()
 | |
| 
 | |
|   def test_sensors_present(self):
 | |
|     # verify correct sensors configuration
 | |
|     seen = set()
 | |
|     for etype in self.events:
 | |
|       for measurement in self.events[etype]:
 | |
|         m = getattr(measurement, measurement.which())
 | |
|         seen.add((str(m.source), m.which()))
 | |
| 
 | |
|     assert seen in SENSOR_CONFIGURATIONS
 | |
| 
 | |
|   def test_lsm6ds3_timing(self, subtests):
 | |
|     # verify measurements are sampled and published at 104Hz
 | |
| 
 | |
|     sensor_t = {
 | |
|       1: [], # accel
 | |
|       5: [], # gyro
 | |
|     }
 | |
| 
 | |
|     for measurement in self.events['accelerometer']:
 | |
|       m = getattr(measurement, measurement.which())
 | |
|       sensor_t[m.sensor].append(m.timestamp)
 | |
| 
 | |
|     for measurement in self.events['gyroscope']:
 | |
|       m = getattr(measurement, measurement.which())
 | |
|       sensor_t[m.sensor].append(m.timestamp)
 | |
| 
 | |
|     for s, vals in sensor_t.items():
 | |
|       with subtests.test(sensor=s):
 | |
|         assert len(vals) > 0
 | |
|         tdiffs = np.diff(vals) / 1e6 # millis
 | |
| 
 | |
|         high_delay_diffs = list(filter(lambda d: d >= 20., tdiffs))
 | |
|         assert len(high_delay_diffs) < 15, f"Too many large diffs: {high_delay_diffs}"
 | |
| 
 | |
|         avg_diff = sum(tdiffs)/len(tdiffs)
 | |
|         avg_freq = 1. / (avg_diff * 1e-3)
 | |
|         assert 92. < avg_freq < 114., f"avg freq {avg_freq}Hz wrong, expected 104Hz"
 | |
| 
 | |
|         stddev = np.std(tdiffs)
 | |
|         assert stddev < 2.0, f"Standard-dev to big {stddev}"
 | |
| 
 | |
|   def test_sensor_frequency(self, subtests):
 | |
|     for s, msgs in self.events.items():
 | |
|       with subtests.test(sensor=s):
 | |
|         freq = len(msgs) / self.sample_secs
 | |
|         ef = SERVICE_LIST[s].frequency
 | |
|         assert ef*0.85 <= freq <= ef*1.15
 | |
| 
 | |
|   def test_logmonottime_timestamp_diff(self):
 | |
|     # ensure diff between the message logMonotime and sample timestamp is small
 | |
| 
 | |
|     tdiffs = list()
 | |
|     for etype in self.events:
 | |
|       for measurement in self.events[etype]:
 | |
|         m = getattr(measurement, measurement.which())
 | |
| 
 | |
|         # check if gyro and accel timestamps are before logMonoTime
 | |
|         if str(m.source).startswith("lsm6ds3") and m.which() != 'temperature':
 | |
|           err_msg = f"Timestamp after logMonoTime: {m.timestamp} > {measurement.logMonoTime}"
 | |
|           assert m.timestamp < measurement.logMonoTime, err_msg
 | |
| 
 | |
|         # negative values might occur, as non interrupt packages created
 | |
|         # before the sensor is read
 | |
|         tdiffs.append(abs(measurement.logMonoTime - m.timestamp) / 1e6)
 | |
| 
 | |
|     # some sensors have a read procedure that will introduce an expected diff on the order of 20ms
 | |
|     high_delay_diffs = set(filter(lambda d: d >= 25., tdiffs))
 | |
|     assert len(high_delay_diffs) < 20, f"Too many measurements published: {high_delay_diffs}"
 | |
| 
 | |
|     avg_diff = round(sum(tdiffs)/len(tdiffs), 4)
 | |
|     assert avg_diff < 4, f"Avg packet diff: {avg_diff:.1f}ms"
 | |
| 
 | |
|   def test_sensor_values(self):
 | |
|     sensor_values = dict()
 | |
|     for etype in self.events:
 | |
|       for measurement in self.events[etype]:
 | |
|         m = getattr(measurement, measurement.which())
 | |
|         key = (m.source.raw, m.which())
 | |
|         values = getattr(m, m.which())
 | |
| 
 | |
|         if hasattr(values, 'v'):
 | |
|           values = values.v
 | |
|         values = np.atleast_1d(values)
 | |
| 
 | |
|         if key in sensor_values:
 | |
|           sensor_values[key].append(values)
 | |
|         else:
 | |
|           sensor_values[key] = [values]
 | |
| 
 | |
|     # Sanity check sensor values
 | |
|     for sensor, stype in sensor_values:
 | |
|       for s in ALL_SENSORS[sensor]:
 | |
|         if s.type != stype:
 | |
|           continue
 | |
| 
 | |
|         key = (sensor, s.type)
 | |
|         mean_norm = np.mean(np.linalg.norm(sensor_values[key], axis=1))
 | |
|         err_msg = f"Sensor '{sensor} {s.type}' failed sanity checks {mean_norm} is not between {s.sanity_min} and {s.sanity_max}"
 | |
|         assert s.sanity_min <= mean_norm <= s.sanity_max, err_msg
 | |
| 
 | |
|   def test_sensor_verify_no_interrupts_after_stop(self):
 | |
|     managed_processes["sensord"].start()
 | |
|     time.sleep(3)
 | |
| 
 | |
|     # read /proc/interrupts to verify interrupts are received
 | |
|     state_one = get_irq_count(self.sensord_irq)
 | |
|     time.sleep(1)
 | |
|     state_two = get_irq_count(self.sensord_irq)
 | |
| 
 | |
|     error_msg = f"no interrupts received after sensord start!\n{state_one} {state_two}"
 | |
|     assert state_one != state_two, error_msg
 | |
| 
 | |
|     managed_processes["sensord"].stop()
 | |
|     time.sleep(1)
 | |
| 
 | |
|     # read /proc/interrupts to verify no more interrupts are received
 | |
|     state_one = get_irq_count(self.sensord_irq)
 | |
|     time.sleep(1)
 | |
|     state_two = get_irq_count(self.sensord_irq)
 | |
|     assert state_one == state_two, "Interrupts received after sensord stop!"
 | |
| 
 | |
| 
 |