import pytest from functools import wraps import json import multiprocessing import os import requests import shutil import time import threading import queue from dataclasses import asdict, replace from datetime import datetime, timedelta from websocket import ABNF from websocket._exceptions import WebSocketConnectionClosedException from cereal import messaging from openpilot.common.params import Params from openpilot.common.timeout import Timeout from openpilot.system.athena import athenad from openpilot.system.athena.athenad import MAX_RETRY_COUNT, dispatcher from openpilot.system.athena.tests.helpers import HTTPRequestHandler, MockWebsocket, MockApi, EchoSocket from openpilot.selfdrive.test.helpers import http_server_context from openpilot.system.hardware.hw import Paths def seed_athena_server(host, port): with Timeout(2, 'HTTP Server seeding failed'): while True: try: requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10) break except requests.exceptions.ConnectionError: time.sleep(0.1) def with_upload_handler(func): @wraps(func) def wrapper(*args, **kwargs): end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() try: return func(*args, **kwargs) finally: end_event.set() thread.join() return wrapper @pytest.fixture def mock_create_connection(mocker): return mocker.patch('openpilot.system.athena.athenad.create_connection') @pytest.fixture def host(): with http_server_context(handler=HTTPRequestHandler, setup=seed_athena_server) as (host, port): yield f"http://{host}:{port}" class TestAthenadMethods: @classmethod def setup_class(cls): cls.SOCKET_PORT = 45454 athenad.Api = MockApi athenad.LOCAL_PORT_WHITELIST = {cls.SOCKET_PORT} def setup_method(self): self.default_params = { "DongleId": "0000000000000000", "GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private", # noqa: E501 "GithubUsername": b"commaci", "AthenadUploadQueue": '[]', } self.params = Params() for k, v in self.default_params.items(): self.params.put(k, v) self.params.put_bool("GsmMetered", True) athenad.upload_queue = queue.Queue() athenad.cur_upload_items.clear() athenad.cancelled_uploads.clear() for i in os.listdir(Paths.log_root()): p = os.path.join(Paths.log_root(), i) if os.path.isdir(p): shutil.rmtree(p) else: os.unlink(p) # *** test helpers *** @staticmethod def _wait_for_upload(): now = time.time() while time.time() - now < 5: if athenad.upload_queue.qsize() == 0: break @staticmethod def _create_file(file: str, parent: str = None, data: bytes = b'') -> str: fn = os.path.join(Paths.log_root() if parent is None else parent, file) os.makedirs(os.path.dirname(fn), exist_ok=True) with open(fn, 'wb') as f: f.write(data) return fn # *** test cases *** def test_echo(self): assert dispatcher["echo"]("bob") == "bob" def test_get_message(self): with pytest.raises(TimeoutError) as _: dispatcher["getMessage"]("controlsState") end_event = multiprocessing.Event() pub_sock = messaging.pub_sock("deviceState") def send_deviceState(): while not end_event.is_set(): msg = messaging.new_message('deviceState') pub_sock.send(msg.to_bytes()) time.sleep(0.01) p = multiprocessing.Process(target=send_deviceState) p.start() time.sleep(0.1) try: deviceState = dispatcher["getMessage"]("deviceState") assert deviceState['deviceState'] finally: end_event.set() p.join() def test_list_data_directory(self): route = '2021-03-29--13-32-47' segments = [0, 1, 2, 3, 11] filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] files = [f'{route}--{s}/{f}' for s in segments for f in filenames] for file in files: self._create_file(file) resp = dispatcher["listDataDirectory"]() assert resp, 'list empty!' assert len(resp) == len(files) resp = dispatcher["listDataDirectory"](f'{route}--123') assert len(resp) == 0 prefix = f'{route}' expected = list(filter(lambda f: f.startswith(prefix), files)) resp = dispatcher["listDataDirectory"](prefix) assert resp, 'list empty!' assert len(resp) == len(expected) prefix = f'{route}--1' expected = list(filter(lambda f: f.startswith(prefix), files)) resp = dispatcher["listDataDirectory"](prefix) assert resp, 'list empty!' assert len(resp) == len(expected) prefix = f'{route}--1/' expected = list(filter(lambda f: f.startswith(prefix), files)) resp = dispatcher["listDataDirectory"](prefix) assert resp, 'list empty!' assert len(resp) == len(expected) prefix = f'{route}--1/q' expected = list(filter(lambda f: f.startswith(prefix), files)) resp = dispatcher["listDataDirectory"](prefix) assert resp, 'list empty!' assert len(resp) == len(expected) def test_strip_extension(self): # any requested log file with an invalid extension won't return as existing fn = self._create_file('qlog.bz2') if fn.endswith('.bz2'): assert athenad.strip_zst_extension(fn) == fn fn = self._create_file('qlog.zst') if fn.endswith('.zst'): assert athenad.strip_zst_extension(fn) == fn[:-4] @pytest.mark.parametrize("compress", [True, False]) def test_do_upload(self, host, compress): # random bytes to ensure rather large object post-compression fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) upload_fn = fn + ('.zst' if compress else '') item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with pytest.raises(requests.exceptions.ConnectionError): athenad._do_upload(item) item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='') resp = athenad._do_upload(item) assert resp.status_code == 201 def test_upload_file_to_url(self, host): fn = self._create_file('qlog.zst') resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {}) assert resp['enqueued'] == 1 assert 'failed' not in resp assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items() assert resp['items'][0].get('id') is not None assert athenad.upload_queue.qsize() == 1 def test_upload_file_to_url_duplicate(self, host): self._create_file('qlog.zst') url1 = f"{host}/qlog.zst?sig=sig1" dispatcher["uploadFileToUrl"]("qlog.zst", url1, {}) # Upload same file again, but with different signature url2 = f"{host}/qlog.zst?sig=sig2" resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {}) assert resp == {'enqueued': 0, 'items': []} def test_upload_file_to_url_does_not_exist(self, host): not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {}) assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']} @with_upload_handler def test_upload_handler(self, host): fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() time.sleep(0.1) # TODO: verify that upload actually succeeded # TODO: also check that end_event and metered network raises AbortTransferException assert athenad.upload_queue.qsize() == 0 @pytest.mark.parametrize("status,retry", [(500,True), (412,False)]) @with_upload_handler def test_upload_handler_retry(self, mocker, host, status, retry): mock_put = mocker.patch('requests.put') mock_put.return_value.__enter__.return_value.status_code = status fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() time.sleep(0.1) assert athenad.upload_queue.qsize() == (1 if retry else 0) if retry: assert athenad.upload_queue.get().retry_count == 1 @with_upload_handler def test_upload_handler_timeout(self): """When an upload times out or fails to connect it should be placed back in the queue""" fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) athenad.upload_queue.put_nowait(item_no_retry) self._wait_for_upload() time.sleep(0.1) # Check that upload with retry count exceeded is not put back assert athenad.upload_queue.qsize() == 0 athenad.upload_queue.put_nowait(item) self._wait_for_upload() time.sleep(0.1) # Check that upload item was put back in the queue with incremented retry count assert athenad.upload_queue.qsize() == 1 assert athenad.upload_queue.get().retry_count == 1 @with_upload_handler def test_cancel_upload(self): item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) dispatcher["cancelUpload"](item.id) assert item.id in athenad.cancelled_uploads self._wait_for_upload() time.sleep(0.1) assert athenad.upload_queue.qsize() == 0 assert len(athenad.cancelled_uploads) == 0 @with_upload_handler def test_cancel_expiry(self): t_future = datetime.now() - timedelta(days=40) ts = int(t_future.strftime("%s")) * 1000 # Item that would time out if actually uploaded fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() time.sleep(0.1) assert athenad.upload_queue.qsize() == 0 def test_list_upload_queue_empty(self): items = dispatcher["listUploadQueue"]() assert len(items) == 0 @with_upload_handler def test_list_upload_queue_current(self, host: str): fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() items = dispatcher["listUploadQueue"]() assert len(items) == 1 assert items[0]['current'] def test_list_upload_queue(self): item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) items = dispatcher["listUploadQueue"]() assert len(items) == 1 assert items[0] == asdict(item) assert not items[0]['current'] athenad.cancelled_uploads.add(item.id) items = dispatcher["listUploadQueue"]() assert len(items) == 0 def test_upload_queue_persistence(self): item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1') item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2') athenad.upload_queue.put_nowait(item1) athenad.upload_queue.put_nowait(item2) # Ensure canceled items are not persisted athenad.cancelled_uploads.add(item2.id) # serialize item athenad.UploadQueueCache.cache(athenad.upload_queue) # deserialize item athenad.upload_queue.queue.clear() athenad.UploadQueueCache.initialize(athenad.upload_queue) assert athenad.upload_queue.qsize() == 1 assert asdict(athenad.upload_queue.queue[-1]) == asdict(item1) def test_start_local_proxy(self, mock_create_connection): end_event = threading.Event() ws_recv = queue.Queue() ws_send = queue.Queue() mock_ws = MockWebsocket(ws_recv, ws_send) mock_create_connection.return_value = mock_ws echo_socket = EchoSocket(self.SOCKET_PORT) socket_thread = threading.Thread(target=echo_socket.run) socket_thread.start() athenad.startLocalProxy(end_event, 'ws://localhost:1234', self.SOCKET_PORT) ws_recv.put_nowait(b'ping') try: recv = ws_send.get(timeout=5) assert recv == (b'ping', ABNF.OPCODE_BINARY), recv finally: # signal websocket close to athenad.ws_proxy_recv ws_recv.put_nowait(WebSocketConnectionClosedException()) socket_thread.join() def test_get_ssh_authorized_keys(self): keys = dispatcher["getSshAuthorizedKeys"]() assert keys == self.default_params["GithubSshKeys"].decode('utf-8') def test_get_github_username(self): keys = dispatcher["getGithubUsername"]() assert keys == self.default_params["GithubUsername"].decode('utf-8') def test_get_version(self): resp = dispatcher["getVersion"]() keys = ["version", "remote", "branch", "commit"] assert list(resp.keys()) == keys for k in keys: assert isinstance(resp[k], str), f"{k} is not a string" assert len(resp[k]) > 0, f"{k} has no value" def test_jsonrpc_handler(self): end_event = threading.Event() thread = threading.Thread(target=athenad.jsonrpc_handler, args=(end_event,)) thread.daemon = True thread.start() try: # with params athenad.recv_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0})) resp = athenad.send_queue.get(timeout=3) assert json.loads(resp) == {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'} # without params athenad.recv_queue.put_nowait(json.dumps({"method": "getNetworkType", "jsonrpc": "2.0", "id": 0})) resp = athenad.send_queue.get(timeout=3) assert json.loads(resp) == {'result': 1, 'id': 0, 'jsonrpc': '2.0'} # log forwarding athenad.recv_queue.put_nowait(json.dumps({'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'})) resp = athenad.log_recv_queue.get(timeout=3) assert json.loads(resp) == {'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'} finally: end_event.set() thread.join() def test_get_logs_to_send_sorted(self): fl = list() for i in range(10): file = f'swaglog.{i:010}' self._create_file(file, Paths.swaglog_root()) fl.append(file) # ensure the list is all logs except most recent sl = athenad.get_logs_to_send_sorted() assert sl == fl[:-1]