|
|
|
@ -1,5 +1,6 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import os |
|
|
|
|
import glob |
|
|
|
|
import time |
|
|
|
|
import unittest |
|
|
|
|
import numpy as np |
|
|
|
@ -7,7 +8,7 @@ from collections import namedtuple, defaultdict |
|
|
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
from cereal import log |
|
|
|
|
from system.hardware import TICI, HARDWARE |
|
|
|
|
from system.hardware import TICI |
|
|
|
|
from selfdrive.manager.process_config import managed_processes |
|
|
|
|
|
|
|
|
|
BMX = { |
|
|
|
@ -70,7 +71,6 @@ ALL_SENSORS = { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LSM_IRQ = 336 |
|
|
|
|
|
|
|
|
|
def get_irq_count(irq: int): |
|
|
|
|
with open(f"/sys/kernel/irq/{irq}/per_cpu_count") as f: |
|
|
|
@ -101,9 +101,6 @@ class TestSensord(unittest.TestCase): |
|
|
|
|
if not TICI: |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
|
|
|
|
|
# make sure gpiochip0 is readable |
|
|
|
|
HARDWARE.initialize_hardware() |
|
|
|
|
|
|
|
|
|
# enable LSM self test |
|
|
|
|
os.environ["LSM_SELF_TEST"] = "1" |
|
|
|
|
|
|
|
|
@ -114,6 +111,15 @@ class TestSensord(unittest.TestCase): |
|
|
|
|
time.sleep(3) |
|
|
|
|
cls.sample_secs = 10 |
|
|
|
|
cls.events = read_sensor_events(cls.sample_secs) |
|
|
|
|
|
|
|
|
|
# determine sensord's irq |
|
|
|
|
cls.sensord_irq = None |
|
|
|
|
for fn in glob.glob('/sys/kernel/irq/*/actions'): |
|
|
|
|
with open(fn) as f: |
|
|
|
|
if "sensord" in f.read(): |
|
|
|
|
cls.sensord_irq = int(fn.split('/')[-2]) |
|
|
|
|
break |
|
|
|
|
assert cls.sensord_irq is not None |
|
|
|
|
finally: |
|
|
|
|
# teardown won't run if this doesn't succeed |
|
|
|
|
managed_processes["sensord"].stop() |
|
|
|
@ -121,8 +127,6 @@ class TestSensord(unittest.TestCase): |
|
|
|
|
@classmethod |
|
|
|
|
def tearDownClass(cls): |
|
|
|
|
managed_processes["sensord"].stop() |
|
|
|
|
if "LSM_SELF_TEST" in os.environ: |
|
|
|
|
del os.environ['LSM_SELF_TEST'] |
|
|
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
|
managed_processes["sensord"].stop() |
|
|
|
@ -250,9 +254,9 @@ class TestSensord(unittest.TestCase): |
|
|
|
|
time.sleep(3) |
|
|
|
|
|
|
|
|
|
# read /proc/interrupts to verify interrupts are received |
|
|
|
|
state_one = get_irq_count(LSM_IRQ) |
|
|
|
|
state_one = get_irq_count(self.sensord_irq) |
|
|
|
|
time.sleep(1) |
|
|
|
|
state_two = get_irq_count(LSM_IRQ) |
|
|
|
|
state_two = get_irq_count(self.sensord_irq) |
|
|
|
|
|
|
|
|
|
error_msg = f"no interrupts received after sensord start!\n{state_one} {state_two}" |
|
|
|
|
assert state_one != state_two, error_msg |
|
|
|
@ -261,9 +265,9 @@ class TestSensord(unittest.TestCase): |
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
|
# read /proc/interrupts to verify no more interrupts are received |
|
|
|
|
state_one = get_irq_count(LSM_IRQ) |
|
|
|
|
state_one = get_irq_count(self.sensord_irq) |
|
|
|
|
time.sleep(1) |
|
|
|
|
state_two = get_irq_count(LSM_IRQ) |
|
|
|
|
state_two = get_irq_count(self.sensord_irq) |
|
|
|
|
assert state_one == state_two, "Interrupts received after sensord stop!" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|