|  |  |  | @ -1,5 +1,6 @@ | 
			
		
	
		
			
				
					|  |  |  |  | #!/usr/bin/env python3 | 
			
		
	
		
			
				
					|  |  |  |  | import os | 
			
		
	
		
			
				
					|  |  |  |  | import glob | 
			
		
	
		
			
				
					|  |  |  |  | import time | 
			
		
	
		
			
				
					|  |  |  |  | import unittest | 
			
		
	
		
			
				
					|  |  |  |  | import numpy as np | 
			
		
	
	
		
			
				
					|  |  |  | @ -7,7 +8,7 @@ from collections import namedtuple, defaultdict | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | import cereal.messaging as messaging | 
			
		
	
		
			
				
					|  |  |  |  | from cereal import log | 
			
		
	
		
			
				
					|  |  |  |  | from system.hardware import TICI, HARDWARE | 
			
		
	
		
			
				
					|  |  |  |  | from system.hardware import TICI | 
			
		
	
		
			
				
					|  |  |  |  | from selfdrive.manager.process_config import managed_processes | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | BMX = { | 
			
		
	
	
		
			
				
					|  |  |  | @ -70,7 +71,6 @@ ALL_SENSORS = { | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | LSM_IRQ = 336 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def get_irq_count(irq: int): | 
			
		
	
		
			
				
					|  |  |  |  |   with open(f"/sys/kernel/irq/{irq}/per_cpu_count") as f: | 
			
		
	
	
		
			
				
					|  |  |  | @ -101,9 +101,6 @@ class TestSensord(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |     if not TICI: | 
			
		
	
		
			
				
					|  |  |  |  |       raise unittest.SkipTest | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # make sure gpiochip0 is readable | 
			
		
	
		
			
				
					|  |  |  |  |     HARDWARE.initialize_hardware() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # enable LSM self test | 
			
		
	
		
			
				
					|  |  |  |  |     os.environ["LSM_SELF_TEST"] = "1" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -114,6 +111,15 @@ class TestSensord(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |       time.sleep(3) | 
			
		
	
		
			
				
					|  |  |  |  |       cls.sample_secs = 10 | 
			
		
	
		
			
				
					|  |  |  |  |       cls.events = read_sensor_events(cls.sample_secs) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # determine sensord's irq | 
			
		
	
		
			
				
					|  |  |  |  |       cls.sensord_irq = None | 
			
		
	
		
			
				
					|  |  |  |  |       for fn in glob.glob('/sys/kernel/irq/*/actions'): | 
			
		
	
		
			
				
					|  |  |  |  |         with open(fn) as f: | 
			
		
	
		
			
				
					|  |  |  |  |           if "sensord" in f.read(): | 
			
		
	
		
			
				
					|  |  |  |  |             cls.sensord_irq = int(fn.split('/')[-2]) | 
			
		
	
		
			
				
					|  |  |  |  |             break | 
			
		
	
		
			
				
					|  |  |  |  |       assert cls.sensord_irq is not None | 
			
		
	
		
			
				
					|  |  |  |  |     finally: | 
			
		
	
		
			
				
					|  |  |  |  |       # teardown won't run if this doesn't succeed | 
			
		
	
		
			
				
					|  |  |  |  |       managed_processes["sensord"].stop() | 
			
		
	
	
		
			
				
					|  |  |  | @ -121,8 +127,6 @@ class TestSensord(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |   @classmethod | 
			
		
	
		
			
				
					|  |  |  |  |   def tearDownClass(cls): | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes["sensord"].stop() | 
			
		
	
		
			
				
					|  |  |  |  |     if "LSM_SELF_TEST" in os.environ: | 
			
		
	
		
			
				
					|  |  |  |  |       del os.environ['LSM_SELF_TEST'] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def tearDown(self): | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes["sensord"].stop() | 
			
		
	
	
		
			
				
					|  |  |  | @ -250,9 +254,9 @@ class TestSensord(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(3) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # read /proc/interrupts to verify interrupts are received | 
			
		
	
		
			
				
					|  |  |  |  |     state_one = get_irq_count(LSM_IRQ) | 
			
		
	
		
			
				
					|  |  |  |  |     state_one = get_irq_count(self.sensord_irq) | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(1) | 
			
		
	
		
			
				
					|  |  |  |  |     state_two = get_irq_count(LSM_IRQ) | 
			
		
	
		
			
				
					|  |  |  |  |     state_two = get_irq_count(self.sensord_irq) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     error_msg = f"no interrupts received after sensord start!\n{state_one} {state_two}" | 
			
		
	
		
			
				
					|  |  |  |  |     assert state_one != state_two, error_msg | 
			
		
	
	
		
			
				
					|  |  |  | @ -261,9 +265,9 @@ class TestSensord(unittest.TestCase): | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(1) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # read /proc/interrupts to verify no more interrupts are received | 
			
		
	
		
			
				
					|  |  |  |  |     state_one = get_irq_count(LSM_IRQ) | 
			
		
	
		
			
				
					|  |  |  |  |     state_one = get_irq_count(self.sensord_irq) | 
			
		
	
		
			
				
					|  |  |  |  |     time.sleep(1) | 
			
		
	
		
			
				
					|  |  |  |  |     state_two = get_irq_count(LSM_IRQ) | 
			
		
	
		
			
				
					|  |  |  |  |     state_two = get_irq_count(self.sensord_irq) | 
			
		
	
		
			
				
					|  |  |  |  |     assert state_one == state_two, "Interrupts received after sensord stop!" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | 
 |