# flake8: noqa import os os.environ['FAKEUPLOAD'] = "1" from common.apk import update_apks, start_frame, pm_apply_packages, android_packages from common.params import Params from common.testing import phone_only from selfdrive.manager import manager_init, manager_prepare from selfdrive.manager import start_managed_process, kill_managed_process, get_running from selfdrive.manager import start_daemon_process from functools import wraps import json import requests import signal import subprocess import time DID_INIT = False # must run first @phone_only def test_manager_prepare(): global DID_INIT manager_init() manager_prepare() DID_INIT = True def with_processes(processes): def wrapper(func): @wraps(func) def wrap(): if not DID_INIT: test_manager_prepare() # start and assert started [start_managed_process(p) for p in processes] assert all(get_running()[name].exitcode is None for name in processes) # call the function try: func() # assert processes are still started assert all(get_running()[name].exitcode is None for name in processes) finally: # kill and assert all stopped [kill_managed_process(p) for p in processes] assert len(get_running()) == 0 return wrap return wrapper def with_apks(): def wrapper(func): @wraps(func) def wrap(): if not DID_INIT: test_manager_prepare() update_apks() pm_apply_packages('enable') start_frame() func() try: for package in android_packages: apk_is_running = (subprocess.call(["pidof", package]) == 0) assert apk_is_running, package finally: pm_apply_packages('disable') for package in android_packages: apk_is_not_running = (subprocess.call(["pidof", package]) == 1) assert apk_is_not_running, package return wrap return wrapper @phone_only @with_processes(['loggerd', 'logmessaged', 'tombstoned', 'proclogd', 'logcatd']) def test_logging(): print("LOGGING IS SET UP") time.sleep(1.0) @phone_only @with_processes(['camerad', 'modeld', 'monitoringd']) def test_visiond(): print("VISIOND IS SET UP") time.sleep(5.0) @phone_only @with_processes(['sensord']) def test_sensord(): print("SENSORS ARE SET UP") time.sleep(1.0) @phone_only @with_processes(['ui']) def test_ui(): print("RUNNING UI") time.sleep(1.0) # will have one thing to upload if loggerd ran # TODO: assert it actually uploaded @phone_only @with_processes(['uploader']) def test_uploader(): print("UPLOADER") time.sleep(10.0) # @phone_only # def test_athena(): # print("ATHENA") # start_daemon_process("manage_athenad") # params = Params() # manage_athenad_pid = params.get("AthenadPid") # assert manage_athenad_pid is not None # try: # os.kill(int(manage_athenad_pid), 0) # # process is running # except OSError: # assert False, "manage_athenad is dead" # def expect_athena_starts(timeout=30): # now = time.time() # athenad_pid = None # while athenad_pid is None: # try: # athenad_pid = subprocess.check_output(["pgrep", "-P", manage_athenad_pid], encoding="utf-8").strip() # return athenad_pid # except subprocess.CalledProcessError: # if time.time() - now > timeout: # assert False, f"Athena did not start within {timeout} seconds" # time.sleep(0.5) # def athena_post(payload, max_retries=5, wait=5): # tries = 0 # while 1: # try: # resp = requests.post( # "https://athena.comma.ai/" + params.get("DongleId", encoding="utf-8"), # headers={ # "Authorization": "JWT " + os.getenv("COMMA_JWT"), # "Content-Type": "application/json" # }, # data=json.dumps(payload), # timeout=30 # ) # resp_json = resp.json() # if resp_json.get('error'): # raise Exception(resp_json['error']) # return resp_json # except Exception as e: # time.sleep(wait) # tries += 1 # if tries == max_retries: # raise # else: # print(f'athena_post failed {e}. retrying...') # def expect_athena_registers(): # resp = athena_post({ # "method": "echo", # "params": ["hello"], # "id": 0, # "jsonrpc": "2.0" # }, max_retries=12, wait=5) # assert resp.get('result') == "hello", f'Athena failed to register ({resp})' # try: # athenad_pid = expect_athena_starts() # # kill athenad and ensure it is restarted (check_output will throw if it is not) # os.kill(int(athenad_pid), signal.SIGINT) # expect_athena_starts() # if not os.getenv('COMMA_JWT'): # print('WARNING: COMMA_JWT env not set, will not test requests to athena.comma.ai') # return # expect_athena_registers() # print("ATHENA: getSimInfo") # resp = athena_post({ # "method": "getSimInfo", # "id": 0, # "jsonrpc": "2.0" # }) # assert resp.get('result'), resp # assert 'sim_id' in resp['result'], resp['result'] # print("ATHENA: takeSnapshot") # resp = athena_post({ # "method": "takeSnapshot", # "id": 0, # "jsonrpc": "2.0" # }) # assert resp.get('result'), resp # assert resp['result']['jpegBack'], resp['result'] # @with_processes(["thermald"]) # def test_athena_thermal(): # print("ATHENA: getMessage(thermal)") # resp = athena_post({ # "method": "getMessage", # "params": {"service": "thermal", "timeout": 5000}, # "id": 0, # "jsonrpc": "2.0" # }) # assert resp.get('result'), resp # assert resp['result']['thermal'], resp['result'] # test_athena_thermal() # finally: # try: # athenad_pid = subprocess.check_output(["pgrep", "-P", manage_athenad_pid], encoding="utf-8").strip() # except subprocess.CalledProcessError: # athenad_pid = None # try: # os.kill(int(manage_athenad_pid), signal.SIGINT) # os.kill(int(athenad_pid), signal.SIGINT) # except (OSError, TypeError): # pass # TODO: re-enable when jenkins test has /data/pythonpath -> /data/openpilot # @phone_only # @with_apks() # def test_apks(): # print("APKS") # time.sleep(14.0)