import pytest
from functools import wraps
import json
import multiprocessing
import os
import requests
import shutil
import time
import threading
import queue
from dataclasses import asdict , replace
from datetime import datetime , timedelta
from websocket import ABNF
from websocket . _exceptions import WebSocketConnectionClosedException
from cereal import messaging
from openpilot . common . params import Params
from openpilot . common . timeout import Timeout
from openpilot . system . athena import athenad
from openpilot . system . athena . athenad import MAX_RETRY_COUNT , dispatcher
from openpilot . system . athena . tests . helpers import HTTPRequestHandler , MockWebsocket , MockApi , EchoSocket
from openpilot . selfdrive . test . helpers import http_server_context
from openpilot . system . hardware . hw import Paths
def seed_athena_server ( host , port ) :
with Timeout ( 2 , ' HTTP Server seeding failed ' ) :
while True :
try :
requests . put ( f ' http:// { host } : { port } /qlog.zst ' , data = ' ' , timeout = 10 )
break
except requests . exceptions . ConnectionError :
time . sleep ( 0.1 )
def with_upload_handler ( func ) :
@wraps ( func )
def wrapper ( * args , * * kwargs ) :
end_event = threading . Event ( )
thread = threading . Thread ( target = athenad . upload_handler , args = ( end_event , ) )
thread . start ( )
try :
return func ( * args , * * kwargs )
finally :
end_event . set ( )
thread . join ( )
return wrapper
@pytest . fixture
def mock_create_connection ( mocker ) :
return mocker . patch ( ' openpilot.system.athena.athenad.create_connection ' )
@pytest . fixture
def host ( ) :
with http_server_context ( handler = HTTPRequestHandler , setup = seed_athena_server ) as ( host , port ) :
yield f " http:// { host } : { port } "
class TestAthenadMethods :
@classmethod
def setup_class ( cls ) :
cls . SOCKET_PORT = 45454
athenad . Api = MockApi
athenad . LOCAL_PORT_WHITELIST = { cls . SOCKET_PORT }
def setup_method ( self ) :
self . default_params = {
" DongleId " : " 0000000000000000 " ,
" GithubSshKeys " : b " ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private " , # noqa: E501
" GithubUsername " : b " commaci " ,
" AthenadUploadQueue " : ' [] ' ,
}
self . params = Params ( )
for k , v in self . default_params . items ( ) :
self . params . put ( k , v )
self . params . put_bool ( " GsmMetered " , True )
athenad . upload_queue = queue . Queue ( )
athenad . cur_upload_items . clear ( )
athenad . cancelled_uploads . clear ( )
for i in os . listdir ( Paths . log_root ( ) ) :
p = os . path . join ( Paths . log_root ( ) , i )
if os . path . isdir ( p ) :
shutil . rmtree ( p )
else :
os . unlink ( p )
# *** test helpers ***
@staticmethod
def _wait_for_upload ( ) :
now = time . time ( )
while time . time ( ) - now < 5 :
if athenad . upload_queue . qsize ( ) == 0 :
break
@staticmethod
def _create_file ( file : str , parent : str = None , data : bytes = b ' ' ) - > str :
fn = os . path . join ( Paths . log_root ( ) if parent is None else parent , file )
os . makedirs ( os . path . dirname ( fn ) , exist_ok = True )
with open ( fn , ' wb ' ) as f :
f . write ( data )
return fn
# *** test cases ***
def test_echo ( self ) :
assert dispatcher [ " echo " ] ( " bob " ) == " bob "
def test_get_message ( self ) :
with pytest . raises ( TimeoutError ) as _ :
dispatcher [ " getMessage " ] ( " controlsState " )
end_event = multiprocessing . Event ( )
pub_sock = messaging . pub_sock ( " deviceState " )
def send_deviceState ( ) :
while not end_event . is_set ( ) :
msg = messaging . new_message ( ' deviceState ' )
pub_sock . send ( msg . to_bytes ( ) )
time . sleep ( 0.01 )
p = multiprocessing . Process ( target = send_deviceState )
p . start ( )
time . sleep ( 0.1 )
try :
deviceState = dispatcher [ " getMessage " ] ( " deviceState " )
assert deviceState [ ' deviceState ' ]
finally :
end_event . set ( )
p . join ( )
def test_list_data_directory ( self ) :
route = ' 2021-03-29--13-32-47 '
segments = [ 0 , 1 , 2 , 3 , 11 ]
filenames = [ ' qlog ' , ' qcamera.ts ' , ' rlog ' , ' fcamera.hevc ' , ' ecamera.hevc ' , ' dcamera.hevc ' ]
files = [ f ' { route } -- { s } / { f } ' for s in segments for f in filenames ]
for file in files :
self . _create_file ( file )
resp = dispatcher [ " listDataDirectory " ] ( )
assert resp , ' list empty! '
assert len ( resp ) == len ( files )
resp = dispatcher [ " listDataDirectory " ] ( f ' { route } --123 ' )
assert len ( resp ) == 0
prefix = f ' { route } '
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1 '
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1/ '
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1/q '
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
def test_strip_extension ( self ) :
# any requested log file with an invalid extension won't return as existing
fn = self . _create_file ( ' qlog.bz2 ' )
if fn . endswith ( ' .bz2 ' ) :
assert athenad . strip_zst_extension ( fn ) == fn
fn = self . _create_file ( ' qlog.zst ' )
if fn . endswith ( ' .zst ' ) :
assert athenad . strip_zst_extension ( fn ) == fn [ : - 4 ]
@pytest . mark . parametrize ( " compress " , [ True , False ] )
def test_do_upload ( self , host , compress ) :
# random bytes to ensure rather large object post-compression
fn = self . _create_file ( ' qlog ' , data = os . urandom ( 10000 * 1024 ) )
upload_fn = fn + ( ' .zst ' if compress else ' ' )
item = athenad . UploadItem ( path = upload_fn , url = " http://localhost:1238 " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' )
with pytest . raises ( requests . exceptions . ConnectionError ) :
athenad . _do_upload ( item )
item = athenad . UploadItem ( path = upload_fn , url = f " { host } /qlog.zst " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' )
resp = athenad . _do_upload ( item )
assert resp . status_code == 201
def test_upload_file_to_url ( self , host ) :
fn = self . _create_file ( ' qlog.zst ' )
resp = dispatcher [ " uploadFileToUrl " ] ( " qlog.zst " , f " { host } /qlog.zst " , { } )
assert resp [ ' enqueued ' ] == 1
assert ' failed ' not in resp
assert { " path " : fn , " url " : f " { host } /qlog.zst " , " headers " : { } } . items ( ) < = resp [ ' items ' ] [ 0 ] . items ( )
assert resp [ ' items ' ] [ 0 ] . get ( ' id ' ) is not None
assert athenad . upload_queue . qsize ( ) == 1
def test_upload_file_to_url_duplicate ( self , host ) :
self . _create_file ( ' qlog.zst ' )
url1 = f " { host } /qlog.zst?sig=sig1 "
dispatcher [ " uploadFileToUrl " ] ( " qlog.zst " , url1 , { } )
# Upload same file again, but with different signature
url2 = f " { host } /qlog.zst?sig=sig2 "
resp = dispatcher [ " uploadFileToUrl " ] ( " qlog.zst " , url2 , { } )
assert resp == { ' enqueued ' : 0 , ' items ' : [ ] }
def test_upload_file_to_url_does_not_exist ( self , host ) :
not_exists_resp = dispatcher [ " uploadFileToUrl " ] ( " does_not_exist.zst " , " http://localhost:1238 " , { } )
assert not_exists_resp == { ' enqueued ' : 0 , ' items ' : [ ] , ' failed ' : [ ' does_not_exist.zst ' ] }
@with_upload_handler
def test_upload_handler ( self , host ) :
fn = self . _create_file ( ' qlog.zst ' )
item = athenad . UploadItem ( path = fn , url = f " { host } /qlog.zst " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
# TODO: verify that upload actually succeeded
# TODO: also check that end_event and metered network raises AbortTransferException
assert athenad . upload_queue . qsize ( ) == 0
@pytest . mark . parametrize ( " status,retry " , [ ( 500 , True ) , ( 412 , False ) ] )
@with_upload_handler
def test_upload_handler_retry ( self , mocker , host , status , retry ) :
mock_put = mocker . patch ( ' requests.put ' )
mock_put . return_value . status_code = status
fn = self . _create_file ( ' qlog.zst ' )
item = athenad . UploadItem ( path = fn , url = f " { host } /qlog.zst " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
assert athenad . upload_queue . qsize ( ) == ( 1 if retry else 0 )
if retry :
assert athenad . upload_queue . get ( ) . retry_count == 1
@with_upload_handler
def test_upload_handler_timeout ( self ) :
""" When an upload times out or fails to connect it should be placed back in the queue """
fn = self . _create_file ( ' qlog.zst ' )
item = athenad . UploadItem ( path = fn , url = " http://localhost:44444/qlog.zst " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
item_no_retry = replace ( item , retry_count = MAX_RETRY_COUNT )
athenad . upload_queue . put_nowait ( item_no_retry )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
# Check that upload with retry count exceeded is not put back
assert athenad . upload_queue . qsize ( ) == 0
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
# Check that upload item was put back in the queue with incremented retry count
assert athenad . upload_queue . qsize ( ) == 1
assert athenad . upload_queue . get ( ) . retry_count == 1
@with_upload_handler
def test_cancel_upload ( self ) :
item = athenad . UploadItem ( path = " qlog.zst " , url = " http://localhost:44444/qlog.zst " , headers = { } ,
created_at = int ( time . time ( ) * 1000 ) , id = ' id ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
dispatcher [ " cancelUpload " ] ( item . id )
assert item . id in athenad . cancelled_uploads
self . _wait_for_upload ( )
time . sleep ( 0.1 )
assert athenad . upload_queue . qsize ( ) == 0
assert len ( athenad . cancelled_uploads ) == 0
@with_upload_handler
def test_cancel_expiry ( self ) :
t_future = datetime . now ( ) - timedelta ( days = 40 )
ts = int ( t_future . strftime ( " %s " ) ) * 1000
# Item that would time out if actually uploaded
fn = self . _create_file ( ' qlog.zst ' )
item = athenad . UploadItem ( path = fn , url = " http://localhost:44444/qlog.zst " , headers = { } , created_at = ts , id = ' ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
assert athenad . upload_queue . qsize ( ) == 0
def test_list_upload_queue_empty ( self ) :
items = dispatcher [ " listUploadQueue " ] ( )
assert len ( items ) == 0
@with_upload_handler
def test_list_upload_queue_current ( self , host : str ) :
fn = self . _create_file ( ' qlog.zst ' )
item = athenad . UploadItem ( path = fn , url = f " { host } /qlog.zst " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
items = dispatcher [ " listUploadQueue " ] ( )
assert len ( items ) == 1
assert items [ 0 ] [ ' current ' ]
def test_list_upload_queue ( self ) :
item = athenad . UploadItem ( path = " qlog.zst " , url = " http://localhost:44444/qlog.zst " , headers = { } ,
created_at = int ( time . time ( ) * 1000 ) , id = ' id ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
items = dispatcher [ " listUploadQueue " ] ( )
assert len ( items ) == 1
assert items [ 0 ] == asdict ( item )
assert not items [ 0 ] [ ' current ' ]
athenad . cancelled_uploads . add ( item . id )
items = dispatcher [ " listUploadQueue " ] ( )
assert len ( items ) == 0
def test_upload_queue_persistence ( self ) :
item1 = athenad . UploadItem ( path = " _ " , url = " _ " , headers = { } , created_at = int ( time . time ( ) ) , id = ' id1 ' )
item2 = athenad . UploadItem ( path = " _ " , url = " _ " , headers = { } , created_at = int ( time . time ( ) ) , id = ' id2 ' )
athenad . upload_queue . put_nowait ( item1 )
athenad . upload_queue . put_nowait ( item2 )
# Ensure cancelled items are not persisted
athenad . cancelled_uploads . add ( item2 . id )
# serialize item
athenad . UploadQueueCache . cache ( athenad . upload_queue )
# deserialize item
athenad . upload_queue . queue . clear ( )
athenad . UploadQueueCache . initialize ( athenad . upload_queue )
assert athenad . upload_queue . qsize ( ) == 1
assert asdict ( athenad . upload_queue . queue [ - 1 ] ) == asdict ( item1 )
def test_start_local_proxy ( self , mock_create_connection ) :
end_event = threading . Event ( )
ws_recv = queue . Queue ( )
ws_send = queue . Queue ( )
mock_ws = MockWebsocket ( ws_recv , ws_send )
mock_create_connection . return_value = mock_ws
echo_socket = EchoSocket ( self . SOCKET_PORT )
socket_thread = threading . Thread ( target = echo_socket . run )
socket_thread . start ( )
athenad . startLocalProxy ( end_event , ' ws://localhost:1234 ' , self . SOCKET_PORT )
ws_recv . put_nowait ( b ' ping ' )
try :
recv = ws_send . get ( timeout = 5 )
assert recv == ( b ' ping ' , ABNF . OPCODE_BINARY ) , recv
finally :
# signal websocket close to athenad.ws_proxy_recv
ws_recv . put_nowait ( WebSocketConnectionClosedException ( ) )
socket_thread . join ( )
def test_get_ssh_authorized_keys ( self ) :
keys = dispatcher [ " getSshAuthorizedKeys " ] ( )
assert keys == self . default_params [ " GithubSshKeys " ] . decode ( ' utf-8 ' )
def test_get_github_username ( self ) :
keys = dispatcher [ " getGithubUsername " ] ( )
assert keys == self . default_params [ " GithubUsername " ] . decode ( ' utf-8 ' )
def test_get_version ( self ) :
resp = dispatcher [ " getVersion " ] ( )
keys = [ " version " , " remote " , " branch " , " commit " ]
assert list ( resp . keys ( ) ) == keys
for k in keys :
assert isinstance ( resp [ k ] , str ) , f " { k } is not a string "
assert len ( resp [ k ] ) > 0 , f " { k } has no value "
def test_jsonrpc_handler ( self ) :
end_event = threading . Event ( )
thread = threading . Thread ( target = athenad . jsonrpc_handler , args = ( end_event , ) )
thread . daemon = True
thread . start ( )
try :
# with params
athenad . recv_queue . put_nowait ( json . dumps ( { " method " : " echo " , " params " : [ " hello " ] , " jsonrpc " : " 2.0 " , " id " : 0 } ) )
resp = athenad . send_queue . get ( timeout = 3 )
assert json . loads ( resp ) == { ' result ' : ' hello ' , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
# without params
athenad . recv_queue . put_nowait ( json . dumps ( { " method " : " getNetworkType " , " jsonrpc " : " 2.0 " , " id " : 0 } ) )
resp = athenad . send_queue . get ( timeout = 3 )
assert json . loads ( resp ) == { ' result ' : 1 , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
# log forwarding
athenad . recv_queue . put_nowait ( json . dumps ( { ' result ' : { ' success ' : 1 } , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' } ) )
resp = athenad . log_recv_queue . get ( timeout = 3 )
assert json . loads ( resp ) == { ' result ' : { ' success ' : 1 } , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
finally :
end_event . set ( )
thread . join ( )
def test_get_logs_to_send_sorted ( self ) :
fl = list ( )
for i in range ( 10 ) :
file = f ' swaglog. { i : 010 } '
self . _create_file ( file , Paths . swaglog_root ( ) )
fl . append ( file )
# ensure the list is all logs except most recent
sl = athenad . get_logs_to_send_sorted ( )
assert sl == fl [ : - 1 ]