You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							78 lines
						
					
					
						
							1.9 KiB
						
					
					
				
			
		
		
	
	
							78 lines
						
					
					
						
							1.9 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| 
 | |
| import time
 | |
| import unittest
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from selfdrive.hardware import TICI
 | |
| from selfdrive.test.helpers import with_processes
 | |
| 
 | |
| TEST_TIMESPAN = 10
 | |
| 
 | |
| SENSOR_CONFIGURATIONS = (
 | |
|   {
 | |
|     ('bmx055', 'acceleration'),
 | |
|     ('bmx055', 'gyroUncalibrated'),
 | |
|     ('bmx055', 'magneticUncalibrated'),
 | |
|     ('bmx055', 'temperature'),
 | |
|     ('lsm6ds3', 'acceleration'),
 | |
|     ('lsm6ds3', 'gyroUncalibrated'),
 | |
|     ('lsm6ds3', 'temperature'),
 | |
|     ('rpr0521', 'light'),
 | |
|   },
 | |
|   {
 | |
|     ('lsm6ds3', 'acceleration'),
 | |
|     ('lsm6ds3', 'gyroUncalibrated'),
 | |
|     ('lsm6ds3', 'temperature'),
 | |
|     ('mmc5603nj', 'magneticUncalibrated'),
 | |
|     ('rpr0521', 'light'),
 | |
|   },
 | |
|   {
 | |
|     ('bmx055', 'acceleration'),
 | |
|     ('bmx055', 'gyroUncalibrated'),
 | |
|     ('bmx055', 'magneticUncalibrated'),
 | |
|     ('bmx055', 'temperature'),
 | |
|     ('lsm6ds3trc', 'acceleration'),
 | |
|     ('lsm6ds3trc', 'gyroUncalibrated'),
 | |
|     ('lsm6ds3trc', 'temperature'),
 | |
|     ('rpr0521', 'light'),
 | |
|   },
 | |
|   {
 | |
|     ('lsm6ds3trc', 'acceleration'),
 | |
|     ('lsm6ds3trc', 'gyroUncalibrated'),
 | |
|     ('lsm6ds3trc', 'temperature'),
 | |
|     ('mmc5603nj', 'magneticUncalibrated'),
 | |
|     ('rpr0521', 'light'),
 | |
|   },
 | |
| )
 | |
| 
 | |
| 
 | |
| class TestSensord(unittest.TestCase):
 | |
|   @classmethod
 | |
|   def setUpClass(cls):
 | |
|     if not TICI:
 | |
|       raise unittest.SkipTest
 | |
| 
 | |
|   @with_processes(['sensord'])
 | |
|   def test_sensors_present(self):
 | |
|     sensor_events = messaging.sub_sock("sensorEvents", timeout=0.1)
 | |
| 
 | |
|     start_time_sec = time.time()
 | |
|     events = []
 | |
|     while time.time() - start_time_sec < TEST_TIMESPAN:
 | |
|       events += messaging.drain_sock(sensor_events)
 | |
|       time.sleep(0.01)
 | |
| 
 | |
|     seen = set()
 | |
|     for event in events:
 | |
|       for measurement in event.sensorEvents:
 | |
|         # Filter out unset events
 | |
|         if measurement.version == 0:
 | |
|           continue
 | |
|         seen.add((str(measurement.source), measurement.which()))
 | |
| 
 | |
|     self.assertIn(seen, SENSOR_CONFIGURATIONS)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   unittest.main()
 | |
| 
 |