#!/usr/bin/env python3
import os
import pytest
import signal
import time
import unittest
from cereal import car
from openpilot . common . params import Params
import openpilot . selfdrive . manager . manager as manager
from openpilot . selfdrive . manager . process import ensure_running
from openpilot . selfdrive . manager . process_config import managed_processes
from openpilot . system . hardware import HARDWARE
os . environ [ ' FAKEUPLOAD ' ] = " 1 "
MAX_STARTUP_TIME = 3
BLACKLIST_PROCS = [ ' manage_athenad ' , ' pandad ' , ' pigeond ' ]
@pytest . mark . tici
class TestManager ( unittest . TestCase ) :
def setUp ( self ) :
HARDWARE . set_power_save ( False )
# ensure clean CarParams
params = Params ( )
params . clear_all ( )
def tearDown ( self ) :
manager . manager_cleanup ( )
def test_manager_prepare ( self ) :
os . environ [ ' PREPAREONLY ' ] = ' 1 '
manager . main ( )
def test_blacklisted_procs ( self ) :
# TODO: ensure there are blacklisted procs until we have a dedicated test
self . assertTrue ( len ( BLACKLIST_PROCS ) , " No blacklisted procs to test not_run " )
def test_startup_time ( self ) :
for _ in range ( 10 ) :
start = time . monotonic ( )
os . environ [ ' PREPAREONLY ' ] = ' 1 '
manager . main ( )
t = time . monotonic ( ) - start
assert t < MAX_STARTUP_TIME , f " startup took { t } s, expected < { MAX_STARTUP_TIME } s "
def test_clean_exit ( self ) :
"""
Ensure all processes exit cleanly when stopped .
"""
HARDWARE . set_power_save ( False )
manager . manager_init ( )
manager . manager_prepare ( )
CP = car . CarParams . new_message ( )
procs = ensure_running ( managed_processes . values ( ) , True , Params ( ) , CP , not_run = BLACKLIST_PROCS )
time . sleep ( 10 )
for p in procs :
with self . subTest ( proc = p . name ) :
state = p . get_process_state_msg ( )
self . assertTrue ( state . running , f " { p . name } not running " )
exit_code = p . stop ( retry = False )
self . assertNotIn ( p . name , BLACKLIST_PROCS , f " { p . name } was started " )
self . assertTrue ( exit_code is not None , f " { p . name } failed to exit " )
# TODO: interrupted blocking read exits with 1 in cereal. use a more unique return code
exit_codes = [ 0 , 1 ]
if p . sigkill :
exit_codes = [ - signal . SIGKILL ]
self . assertIn ( exit_code , exit_codes , f " { p . name } died with { exit_code } " )
if __name__ == " __main__ " :
unittest . main ( )