import os
import pytest
import signal
import time
from cereal import car
from openpilot . common . params import Params
import openpilot . system . manager . manager as manager
from openpilot . system . manager . process import ensure_running
from openpilot . system . manager . process_config import managed_processes , procs
from openpilot . system . hardware import HARDWARE
os . environ [ ' FAKEUPLOAD ' ] = " 1 "
MAX_STARTUP_TIME = 3
BLACKLIST_PROCS = [ ' manage_athenad ' , ' pandad ' , ' pigeond ' ]
class TestManager :
def setup_method ( self ) :
HARDWARE . set_power_save ( False )
# ensure clean CarParams
params = Params ( )
params . clear_all ( )
def teardown_method ( self ) :
manager . manager_cleanup ( )
def test_manager_prepare ( self ) :
os . environ [ ' PREPAREONLY ' ] = ' 1 '
manager . main ( )
def test_duplicate_procs ( self ) :
assert len ( procs ) == len ( managed_processes ) , " Duplicate process names "
def test_blacklisted_procs ( self ) :
# TODO: ensure there are blacklisted procs until we have a dedicated test
assert len ( BLACKLIST_PROCS ) , " No blacklisted procs to test not_run "
@pytest . mark . skip ( " this test is flaky the way it ' s currently written, should be moved to test_onroad " )
def test_clean_exit ( self , subtests ) :
"""
Ensure all processes exit cleanly when stopped .
"""
HARDWARE . set_power_save ( False )
manager . manager_init ( )
CP = car . CarParams . new_message ( )
procs = ensure_running ( managed_processes . values ( ) , True , Params ( ) , CP , not_run = BLACKLIST_PROCS )
time . sleep ( 10 )
for p in procs :
with subtests . test ( proc = p . name ) :
state = p . get_process_state_msg ( )
assert state . running , f " { p . name } not running "
exit_code = p . stop ( retry = False )
assert p . name not in BLACKLIST_PROCS , f " { p . name } was started "
assert exit_code is not None , f " { p . name } failed to exit "
# TODO: interrupted blocking read exits with 1 in cereal. use a more unique return code
exit_codes = [ 0 , 1 ]
if p . sigkill :
exit_codes = [ - signal . SIGKILL ]
assert exit_code in exit_codes , f " { p . name } died with { exit_code } "