move nose tests to unittest (#22665)

* move to unittest

* boardd loopback

* no more nose

* phone only

* fix panda decorator
pull/22689/head
Adeeb Shihadeh 4 years ago committed by GitHub
parent e556d3d3d3
commit b5960b9dc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Jenkinsfile
  2. 144
      selfdrive/boardd/tests/test_boardd_loopback.py
  3. 15
      selfdrive/test/helpers.py
  4. 69
      selfdrive/ui/tests/test_soundd.py
  5. 65
      selfdrive/ui/tests/test_sounds.py

4
Jenkinsfile vendored

@ -170,8 +170,8 @@ pipeline {
steps { steps {
phone_steps("eon", [ phone_steps("eon", [
["build", "cd selfdrive/manager && ./build.py"], ["build", "cd selfdrive/manager && ./build.py"],
["test sounds", "nosetests -s selfdrive/ui/tests/test_sounds.py"], ["test sounds", "python selfdrive/ui/tests/test_soundd.py"],
["test boardd loopback", "nosetests -s selfdrive/boardd/tests/test_boardd_loopback.py"], ["test boardd loopback", "python selfdrive/boardd/tests/test_boardd_loopback.py"],
["test loggerd", "python selfdrive/loggerd/tests/test_loggerd.py"], ["test loggerd", "python selfdrive/loggerd/tests/test_loggerd.py"],
["test encoder", "python selfdrive/loggerd/tests/test_encoder.py"], ["test encoder", "python selfdrive/loggerd/tests/test_encoder.py"],
["test logcatd", "python selfdrive/logcatd/tests/test_logcatd_android.py"], ["test logcatd", "python selfdrive/logcatd/tests/test_logcatd_android.py"],

@ -2,96 +2,104 @@
import os import os
import random import random
import time import time
import unittest
from collections import defaultdict from collections import defaultdict
from functools import wraps from functools import wraps
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car from cereal import car
from common.basedir import BASEDIR
from common.params import Params from common.params import Params
from common.spinner import Spinner from common.spinner import Spinner
from common.timeout import Timeout from common.timeout import Timeout
from panda import Panda from panda import Panda
from selfdrive.boardd.boardd import can_list_to_can_capnp from selfdrive.boardd.boardd import can_list_to_can_capnp
from selfdrive.car import make_can_msg from selfdrive.car import make_can_msg
from selfdrive.test.helpers import with_processes from selfdrive.test.helpers import phone_only, with_processes
def reset_panda(fn): def reset_panda(f):
@wraps(fn) @wraps(f)
def wrapper(): def wrapper(*args, **kwargs):
p = Panda() p = Panda()
for i in [0, 1, 2, 0xFFFF]: for i in [0, 1, 2, 0xFFFF]:
p.can_clear(i) p.can_clear(i)
p.reset() p.reset()
p.close() p.close()
fn() f(*args, **kwargs)
return wrapper return wrapper
os.environ['STARTED'] = '1' os.environ['STARTED'] = '1'
os.environ['BOARDD_LOOPBACK'] = '1' os.environ['BOARDD_LOOPBACK'] = '1'
os.environ['BASEDIR'] = BASEDIR
class TestBoardd(unittest.TestCase):
@reset_panda
@with_processes(['pandad']) @classmethod
def test_boardd_loopback(): def setUpClass(cls):
# wait for boardd to init cls.spinner = Spinner()
spinner = Spinner()
time.sleep(2) @classmethod
def tearDownClass(cls):
with Timeout(60, "boardd didn't start"): cls.spinner.close()
sm = messaging.SubMaster(['pandaStates'])
while sm.rcv_frame['pandaStates'] < 1: @phone_only
sm.update(1000) @reset_panda
@with_processes(['pandad'])
# boardd blocks on CarVin and CarParams def test_loopback(self):
cp = car.CarParams.new_message() # wait for boardd to init
time.sleep(2)
safety_config = car.CarParams.SafetyConfig.new_message()
safety_config.safetyModel = car.CarParams.SafetyModel.allOutput with Timeout(60, "boardd didn't start"):
cp.safetyConfigs = [safety_config] sm = messaging.SubMaster(['pandaStates'])
while sm.rcv_frame['pandaStates'] < 1:
Params().put("CarVin", b"0"*17) sm.update(1000)
Params().put_bool("ControlsReady", True)
Params().put("CarParams", cp.to_bytes()) # boardd blocks on CarVin and CarParams
cp = car.CarParams.new_message()
sendcan = messaging.pub_sock('sendcan')
can = messaging.sub_sock('can', conflate=False, timeout=100) safety_config = car.CarParams.SafetyConfig.new_message()
safety_config.safetyModel = car.CarParams.SafetyModel.allOutput
time.sleep(1) cp.safetyConfigs = [safety_config]
n = 1000 params = Params()
for i in range(n): params.put("CarVin", b"0"*17)
spinner.update(f"boardd loopback {i}/{n}") params.put_bool("ControlsReady", True)
params.put("CarParams", cp.to_bytes())
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)): sendcan = messaging.pub_sock('sendcan')
to_send = [] can = messaging.sub_sock('can', conflate=False, timeout=100)
for __ in range(random.randrange(100)):
bus = random.randrange(3) time.sleep(1)
addr = random.randrange(1, 1<<29)
dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))]) n = 1000
sent_msgs[bus].add((addr, dat)) for i in range(n):
to_send.append(make_can_msg(addr, dat, bus)) self.spinner.update(f"boardd loopback {i}/{n}")
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
sent_msgs = defaultdict(set)
max_recv = 10 for _ in range(random.randrange(10)):
while max_recv > 0 and any(len(sent_msgs[bus]) for bus in range(3)): to_send = []
recvd = messaging.drain_sock(can, wait_for_one=True) for __ in range(random.randrange(100)):
for msg in recvd: bus = random.randrange(3)
for m in msg.can: addr = random.randrange(1, 1<<29)
if m.src >= 128: dat = bytes([random.getrandbits(8) for _ in range(random.randrange(1, 9))])
k = (m.address, m.dat) sent_msgs[bus].add((addr, dat))
assert k in sent_msgs[m.src-128] to_send.append(make_can_msg(addr, dat, bus))
sent_msgs[m.src-128].discard(k) sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
max_recv -= 1
max_recv = 10
# if a set isn't empty, messages got dropped while max_recv > 0 and any(len(sent_msgs[bus]) for bus in range(3)):
for bus in range(3): recvd = messaging.drain_sock(can, wait_for_one=True)
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages" for msg in recvd:
for m in msg.can:
spinner.close() if m.src >= 128:
k = (m.address, m.dat)
assert k in sent_msgs[m.src-128]
sent_msgs[m.src-128].discard(k)
max_recv -= 1
# if a set isn't empty, messages got dropped
for bus in range(3):
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
if __name__ == "__main__": if __name__ == "__main__":
test_boardd_loopback() unittest.main()

@ -1,10 +1,9 @@
import time import time
from functools import wraps from functools import wraps
from nose.tools import nottest
from selfdrive.hardware import PC from selfdrive.hardware import PC
from selfdrive.version import training_version, terms_version
from selfdrive.manager.process_config import managed_processes from selfdrive.manager.process_config import managed_processes
from selfdrive.version import training_version, terms_version
def set_params_enabled(): def set_params_enabled():
@ -17,11 +16,13 @@ def set_params_enabled():
params.put_bool("Passive", False) params.put_bool("Passive", False)
def phone_only(x): def phone_only(f):
if PC: @wraps(f)
return nottest(x) def wrap(self, *args, **kwargs):
else: if PC:
return x self.skipTest("This test is not meant to run on PC")
f(self, *args, **kwargs)
return wrap
def with_processes(processes, init_time=0, ignore_stopped=None): def with_processes(processes, init_time=0, ignore_stopped=None):

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import subprocess
import time
import unittest
from cereal import log, car
import cereal.messaging as messaging
from selfdrive.test.helpers import phone_only, with_processes
# TODO: rewrite for unittest
from common.realtime import DT_CTRL
from selfdrive.hardware import HARDWARE
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
SOUNDS = {
# sound: total writes
AudibleAlert.none: 0,
AudibleAlert.chimeEngage: 173,
AudibleAlert.chimeDisengage: 173,
AudibleAlert.chimeError: 173,
AudibleAlert.chimePrompt: 173,
AudibleAlert.chimeWarning1: 163,
AudibleAlert.chimeWarning2: 216,
AudibleAlert.chimeWarning2Repeat: 470,
AudibleAlert.chimeWarningRepeat: 468,
}
def get_total_writes():
audio_flinger = subprocess.check_output('dumpsys media.audio_flinger', shell=True, encoding='utf-8').strip()
write_lines = [l for l in audio_flinger.split('\n') if l.strip().startswith('Total writes')]
return sum([int(l.split(':')[1]) for l in write_lines])
class TestSoundd(unittest.TestCase):
def test_sound_card_init(self):
assert HARDWARE.get_sound_card_online()
@phone_only
@with_processes(['soundd'])
def test_alert_sounds(self):
pm = messaging.PubMaster(['controlsState'])
# make sure they're all defined
alert_sounds = {v: k for k, v in car.CarControl.HUDControl.AudibleAlert.schema.enumerants.items()}
diff = set(SOUNDS.keys()).symmetric_difference(alert_sounds.keys())
assert len(diff) == 0, f"not all sounds defined in test: {diff}"
# wait for procs to init
time.sleep(1)
for sound, expected_writes in SOUNDS.items():
print(f"testing {alert_sounds[sound]}")
start_writes = get_total_writes()
for _ in range(int(9 / DT_CTRL)):
msg = messaging.new_message('controlsState')
msg.controlsState.alertSound = sound
msg.controlsState.alertType = str(sound)
msg.controlsState.alertText1 = "Testing Sounds"
msg.controlsState.alertText2 = f"playing {alert_sounds[sound]}"
msg.controlsState.alertSize = log.ControlsState.AlertSize.mid
pm.send('controlsState', msg)
time.sleep(DT_CTRL)
tolerance = (expected_writes % 100) * 2
actual_writes = get_total_writes() - start_writes
assert abs(expected_writes - actual_writes) <= tolerance, f"{alert_sounds[sound]}: expected {expected_writes} writes, got {actual_writes}"
if __name__ == "__main__":
unittest.main()

@ -1,65 +0,0 @@
#!/usr/bin/env python3
import time
import subprocess
from cereal import log, car
import cereal.messaging as messaging
from selfdrive.test.helpers import phone_only, with_processes
from common.realtime import DT_CTRL
from selfdrive.hardware import HARDWARE
AudibleAlert = car.CarControl.HUDControl.AudibleAlert
SOUNDS = {
# sound: total writes
AudibleAlert.none: 0,
AudibleAlert.chimeEngage: 173,
AudibleAlert.chimeDisengage: 173,
AudibleAlert.chimeError: 173,
AudibleAlert.chimePrompt: 173,
AudibleAlert.chimeWarning1: 163,
AudibleAlert.chimeWarning2: 216,
AudibleAlert.chimeWarning2Repeat: 470,
AudibleAlert.chimeWarningRepeat: 468,
}
def get_total_writes():
audio_flinger = subprocess.check_output('dumpsys media.audio_flinger', shell=True, encoding='utf-8').strip()
write_lines = [l for l in audio_flinger.split('\n') if l.strip().startswith('Total writes')]
return sum([int(l.split(':')[1]) for l in write_lines])
@phone_only
def test_sound_card_init():
assert HARDWARE.get_sound_card_online()
@phone_only
@with_processes(['soundd'])
def test_alert_sounds():
pm = messaging.PubMaster(['controlsState'])
# make sure they're all defined
alert_sounds = {v: k for k, v in car.CarControl.HUDControl.AudibleAlert.schema.enumerants.items()}
diff = set(SOUNDS.keys()).symmetric_difference(alert_sounds.keys())
assert len(diff) == 0, f"not all sounds defined in test: {diff}"
# wait for procs to init
time.sleep(1)
for sound, expected_writes in SOUNDS.items():
print(f"testing {alert_sounds[sound]}")
start_writes = get_total_writes()
for _ in range(int(9 / DT_CTRL)):
msg = messaging.new_message('controlsState')
msg.controlsState.alertSound = sound
msg.controlsState.alertType = str(sound)
msg.controlsState.alertText1 = "Testing Sounds"
msg.controlsState.alertText2 = f"playing {alert_sounds[sound]}"
msg.controlsState.alertSize = log.ControlsState.AlertSize.mid
pm.send('controlsState', msg)
time.sleep(DT_CTRL)
tolerance = (expected_writes % 100) * 2
actual_writes = get_total_writes() - start_writes
assert abs(expected_writes - actual_writes) <= tolerance, f"{alert_sounds[sound]}: expected {expected_writes} writes, got {actual_writes}"
Loading…
Cancel
Save