# flake8: noqa
import os
os . environ [ ' FAKEUPLOAD ' ] = " 1 "
from common . apk import update_apks , start_frame , pm_apply_packages , android_packages
from common . params import Params
from common . testing import phone_only
from selfdrive . manager import manager_init , manager_prepare
from selfdrive . manager import start_managed_process , kill_managed_process , get_running
from selfdrive . manager import start_daemon_process
from functools import wraps
import json
import requests
import signal
import subprocess
import time
DID_INIT = False
# must run first
@phone_only
def test_manager_prepare ( ) :
global DID_INIT
manager_init ( )
manager_prepare ( )
DID_INIT = True
def with_processes ( processes ) :
def wrapper ( func ) :
@wraps ( func )
def wrap ( ) :
if not DID_INIT :
test_manager_prepare ( )
# start and assert started
[ start_managed_process ( p ) for p in processes ]
assert all ( get_running ( ) [ name ] . exitcode is None for name in processes )
# call the function
try :
func ( )
# assert processes are still started
assert all ( get_running ( ) [ name ] . exitcode is None for name in processes )
finally :
# kill and assert all stopped
[ kill_managed_process ( p ) for p in processes ]
assert len ( get_running ( ) ) == 0
return wrap
return wrapper
def with_apks ( ) :
def wrapper ( func ) :
@wraps ( func )
def wrap ( ) :
if not DID_INIT :
test_manager_prepare ( )
update_apks ( )
pm_apply_packages ( ' enable ' )
start_frame ( )
func ( )
try :
for package in android_packages :
apk_is_running = ( subprocess . call ( [ " pidof " , package ] ) == 0 )
assert apk_is_running , package
finally :
pm_apply_packages ( ' disable ' )
for package in android_packages :
apk_is_not_running = ( subprocess . call ( [ " pidof " , package ] ) == 1 )
assert apk_is_not_running , package
return wrap
return wrapper
@phone_only
@with_processes ( [ ' loggerd ' , ' logmessaged ' , ' tombstoned ' , ' proclogd ' , ' logcatd ' ] )
def test_logging ( ) :
print ( " LOGGING IS SET UP " )
time . sleep ( 1.0 )
@phone_only
@with_processes ( [ ' camerad ' , ' modeld ' , ' dmonitoringmodeld ' ] )
def test_visiond ( ) :
print ( " VISIOND IS SET UP " )
time . sleep ( 5.0 )
@phone_only
@with_processes ( [ ' sensord ' ] )
def test_sensord ( ) :
print ( " SENSORS ARE SET UP " )
time . sleep ( 1.0 )
@phone_only
@with_processes ( [ ' ui ' ] )
def test_ui ( ) :
print ( " RUNNING UI " )
time . sleep ( 1.0 )
# will have one thing to upload if loggerd ran
# TODO: assert it actually uploaded
@phone_only
@with_processes ( [ ' uploader ' ] )
def test_uploader ( ) :
print ( " UPLOADER " )
time . sleep ( 10.0 )
@phone_only
def test_athena ( ) :
print ( " ATHENA " )
start_daemon_process ( " manage_athenad " )
params = Params ( )
manage_athenad_pid = params . get ( " AthenadPid " )
assert manage_athenad_pid is not None
try :
os . kill ( int ( manage_athenad_pid ) , 0 )
# process is running
except OSError :
assert False , " manage_athenad is dead "
def expect_athena_starts ( timeout = 30 ) :
now = time . time ( )
athenad_pid = None
while athenad_pid is None :
try :
athenad_pid = subprocess . check_output ( [ " pgrep " , " -P " , manage_athenad_pid ] , encoding = " utf-8 " ) . strip ( )
return athenad_pid
except subprocess . CalledProcessError :
if time . time ( ) - now > timeout :
assert False , f " Athena did not start within { timeout } seconds "
time . sleep ( 0.5 )
def athena_post ( payload , max_retries = 5 , wait = 5 ) :
tries = 0
while 1 :
try :
resp = requests . post (
" https://athena.comma.ai/ " + params . get ( " DongleId " , encoding = " utf-8 " ) ,
headers = {
" Authorization " : " JWT " + os . getenv ( " COMMA_JWT " ) ,
" Content-Type " : " application/json "
} ,
data = json . dumps ( payload ) ,
timeout = 30
)
resp_json = resp . json ( )
if resp_json . get ( ' error ' ) :
raise Exception ( resp_json [ ' error ' ] )
return resp_json
except Exception as e :
time . sleep ( wait )
tries + = 1
if tries == max_retries :
raise
else :
print ( f ' athena_post failed { e } . retrying... ' )
def expect_athena_registers ( ) :
resp = athena_post ( {
" method " : " echo " ,
" params " : [ " hello " ] ,
" id " : 0 ,
" jsonrpc " : " 2.0 "
} , max_retries = 12 , wait = 5 )
assert resp . get ( ' result ' ) == " hello " , f ' Athena failed to register ( { resp } ) '
try :
athenad_pid = expect_athena_starts ( )
# kill athenad and ensure it is restarted (check_output will throw if it is not)
os . kill ( int ( athenad_pid ) , signal . SIGINT )
expect_athena_starts ( )
if not os . getenv ( ' COMMA_JWT ' ) :
print ( ' WARNING: COMMA_JWT env not set, will not test requests to athena.comma.ai ' )
return
expect_athena_registers ( )
print ( " ATHENA: getSimInfo " )
resp = athena_post ( {
" method " : " getSimInfo " ,
" id " : 0 ,
" jsonrpc " : " 2.0 "
} )
assert resp . get ( ' result ' ) , resp
assert ' sim_id ' in resp [ ' result ' ] , resp [ ' result ' ]
print ( " ATHENA: takeSnapshot " )
resp = athena_post ( {
" method " : " takeSnapshot " ,
" id " : 0 ,
" jsonrpc " : " 2.0 "
} )
assert resp . get ( ' result ' ) , resp
assert resp [ ' result ' ] [ ' jpegBack ' ] , resp [ ' result ' ]
@with_processes ( [ " thermald " ] )
def test_athena_thermal ( ) :
print ( " ATHENA: getMessage(thermal) " )
resp = athena_post ( {
" method " : " getMessage " ,
" params " : { " service " : " thermal " , " timeout " : 5000 } ,
" id " : 0 ,
" jsonrpc " : " 2.0 "
} )
assert resp . get ( ' result ' ) , resp
assert resp [ ' result ' ] [ ' thermal ' ] , resp [ ' result ' ]
test_athena_thermal ( )
finally :
try :
athenad_pid = subprocess . check_output ( [ " pgrep " , " -P " , manage_athenad_pid ] , encoding = " utf-8 " ) . strip ( )
except subprocess . CalledProcessError :
athenad_pid = None
try :
os . kill ( int ( manage_athenad_pid ) , signal . SIGINT )
os . kill ( int ( athenad_pid ) , signal . SIGINT )
except ( OSError , TypeError ) :
pass
# TODO: re-enable when jenkins test has /data/pythonpath -> /data/openpilot
# @phone_only
# @with_apks()
# def test_apks():
# print("APKS")
# time.sleep(14.0)