sensord: test updates for provisioning (#29768)

* sensord: test updates for provisioning

* fix

---------

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 0908be80bd
test-msgs
Adeeb Shihadeh 2 years ago committed by GitHub
parent 21b75589d1
commit 6a3da36632
  1. 25
      system/sensord/tests/test_sensord.py

@ -9,6 +9,7 @@ import cereal.messaging as messaging
from cereal import log
from cereal.services import service_list
from openpilot.common.gpio import get_irqs_for_action
from openpilot.common.timeout import Timeout
from openpilot.system.hardware import TICI
from openpilot.selfdrive.manager.process_config import managed_processes
@ -73,15 +74,24 @@ def get_irq_count(irq: int):
def read_sensor_events(duration_sec):
sensor_types = ['accelerometer', 'gyroscope', 'magnetometer', 'accelerometer2',
'gyroscope2', 'temperatureSensor', 'temperatureSensor2']
esocks = {}
socks = {}
poller = messaging.Poller()
events = defaultdict(list)
for stype in sensor_types:
esocks[stype] = messaging.sub_sock(stype, timeout=0.1)
socks[stype] = messaging.sub_sock(stype, poller=poller, timeout=100)
start_time_sec = time.monotonic()
while time.monotonic() - start_time_sec < duration_sec:
for esock in esocks:
events[esock] += messaging.drain_sock(esocks[esock])
# wait for sensors to come up
with Timeout(60, "sensors didn't come up"):
while len(poller.poll(250)) == 0:
pass
time.sleep(1)
for s in socks.values():
messaging.drain_sock_raw(s)
st = time.monotonic()
while time.monotonic() - st < duration_sec:
for s in socks:
events[s] += messaging.drain_sock(socks[s])
time.sleep(0.1)
assert sum(map(len, events.values())) != 0, "No sensor events collected!"
@ -101,8 +111,7 @@ class TestSensord(unittest.TestCase):
os.system("pkill -f ./_sensord")
try:
managed_processes["sensord"].start()
time.sleep(3)
cls.sample_secs = 10
cls.sample_secs = int(os.getenv("SAMPLE_SECS", "10"))
cls.events = read_sensor_events(cls.sample_secs)
# determine sensord's irq

Loading…
Cancel
Save