@ -1,5 +1,6 @@
#!/usr/bin/env python3
from functools import partial , wraps
import pytest
from functools import wraps
import json
import multiprocessing
import os
@ -8,12 +9,9 @@ import shutil
import time
import threading
import queue
import unittest
from dataclasses import asdict , replace
from datetime import datetime , timedelta
from parameterized import parameterized
from unittest import mock
from websocket import ABNF
from websocket . _exceptions import WebSocketConnectionClosedException
@ -24,7 +22,7 @@ from openpilot.common.timeout import Timeout
from openpilot . selfdrive . athena import athenad
from openpilot . selfdrive . athena . athenad import MAX_RETRY_COUNT , dispatcher
from openpilot . selfdrive . athena . tests . helpers import HTTPRequestHandler , MockWebsocket , MockApi , EchoSocket
from openpilot . selfdrive . test . helpers import with_ http_server
from openpilot . selfdrive . test . helpers import http_server_context
from openpilot . system . hardware . hw import Paths
@ -37,10 +35,6 @@ def seed_athena_server(host, port):
except requests . exceptions . ConnectionError :
time . sleep ( 0.1 )
with_mock_athena = partial ( with_http_server , handler = HTTPRequestHandler , setup = seed_athena_server )
def with_upload_handler ( func ) :
@wraps ( func )
def wrapper ( * args , * * kwargs ) :
@ -54,15 +48,23 @@ def with_upload_handler(func):
thread . join ( )
return wrapper
@pytest . fixture
def mock_create_connection ( mocker ) :
return mocker . patch ( ' openpilot.selfdrive.athena.athenad.create_connection ' )
@pytest . fixture
def host ( ) :
with http_server_context ( handler = HTTPRequestHandler , setup = seed_athena_server ) as ( host , port ) :
yield f " http:// { host } : { port } "
class TestAthenadMethods ( unittest . TestCase ) :
class TestAthenadMethods :
@classmethod
def setUpClass ( cls ) :
def setup_c lass ( cls ) :
cls . SOCKET_PORT = 45454
athenad . Api = MockApi
athenad . LOCAL_PORT_WHITELIST = { cls . SOCKET_PORT }
def setUp ( self ) :
def setup_method ( self ) :
self . default_params = {
" DongleId " : " 0000000000000000 " ,
" GithubSshKeys " : b " ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private " , # noqa: E501
@ -109,8 +111,8 @@ class TestAthenadMethods(unittest.TestCase):
def test_echo ( self ) :
assert dispatcher [ " echo " ] ( " bob " ) == " bob "
def test_getM essage ( self ) :
with self . assertR aises( TimeoutError ) as _ :
def test_get_m essage ( self ) :
with pytest . r aises( TimeoutError ) as _ :
dispatcher [ " getMessage " ] ( " controlsState " )
end_event = multiprocessing . Event ( )
@ -133,7 +135,7 @@ class TestAthenadMethods(unittest.TestCase):
end_event . set ( )
p . join ( )
def test_listDataD irectory ( self ) :
def test_list_data_d irectory ( self ) :
route = ' 2021-03-29--13-32-47 '
segments = [ 0 , 1 , 2 , 3 , 11 ]
@ -143,69 +145,66 @@ class TestAthenadMethods(unittest.TestCase):
self . _create_file ( file )
resp = dispatcher [ " listDataDirectory " ] ( )
self . assertTrue ( resp , ' list empty! ' )
self . assertCountEqual ( resp , files )
assert resp , ' list empty! '
assert len ( resp ) == len ( files )
resp = dispatcher [ " listDataDirectory " ] ( f ' { route } --123 ' )
self . assertCountEqual ( resp , [ ] )
assert len ( resp ) == 0
prefix = f ' { route } '
expected = filter ( lambda f : f . startswith ( prefix ) , files )
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
self . assertTrue ( resp , ' list empty! ' )
self . assertCountEqual ( resp , expected )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1 '
expected = filter ( lambda f : f . startswith ( prefix ) , files )
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
self . assertTrue ( resp , ' list empty! ' )
self . assertCountEqual ( resp , expected )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1/ '
expected = filter ( lambda f : f . startswith ( prefix ) , files )
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
self . assertTrue ( resp , ' list empty! ' )
self . assertCountEqual ( resp , expected )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
prefix = f ' { route } --1/q '
expected = filter ( lambda f : f . startswith ( prefix ) , files )
expected = list ( filter ( lambda f : f . startswith ( prefix ) , files ) )
resp = dispatcher [ " listDataDirectory " ] ( prefix )
self . assertTrue ( resp , ' list empty! ' )
self . assertCountEqual ( resp , expected )
assert resp , ' list empty! '
assert len ( resp ) == len ( expected )
def test_strip_bz2_extension ( self ) :
fn = self . _create_file ( ' qlog.bz2 ' )
if fn . endswith ( ' .bz2 ' ) :
self . assertEqual ( athenad . strip_bz2_extension ( fn ) , fn [ : - 4 ] )
assert athenad . strip_bz2_extension ( fn ) == fn [ : - 4 ]
@parameterized . expand ( [ ( True , ) , ( False , ) ] )
@with_mock_athena
def test_do_upload ( self , compress , host ) :
@pytest . mark . parametrize ( " compress " , [ True , False ] )
def test_do_upload ( self , host , compress ) :
# random bytes to ensure rather large object post-compression
fn = self . _create_file ( ' qlog ' , data = os . urandom ( 10000 * 1024 ) )
upload_fn = fn + ( ' .bz2 ' if compress else ' ' )
item = athenad . UploadItem ( path = upload_fn , url = " http://localhost:1238 " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' )
with self . assertR aises( requests . exceptions . ConnectionError ) :
with pytest . r aises( requests . exceptions . ConnectionError ) :
athenad . _do_upload ( item )
item = athenad . UploadItem ( path = upload_fn , url = f " { host } /qlog.bz2 " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' )
resp = athenad . _do_upload ( item )
self . assertEqual ( resp . status_code , 201 )
assert resp . status_code == 201
@with_mock_athena
def test_uploadFileToUrl ( self , host ) :
def test_upload_file_to_url ( self , host ) :
fn = self . _create_file ( ' qlog.bz2 ' )
resp = dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " , f " { host } /qlog.bz2 " , { } )
self . assertEqual ( resp [ ' enqueued ' ] , 1 )
self . assertNotIn ( ' failed ' , resp )
self . assertLessEqual ( { " path " : fn , " url " : f " { host } /qlog.bz2 " , " headers " : { } } . items ( ) , resp [ ' items ' ] [ 0 ] . items ( ) )
self . assertIsNotNone ( resp [ ' items ' ] [ 0 ] . get ( ' id ' ) )
self . assertEqual ( athenad . upload_queue . qsize ( ) , 1 )
@with_mock_athena
def test_uploadFileToUrl_duplicate ( self , host ) :
assert resp [ ' enqueued ' ] == 1
assert ' failed ' not in resp
assert { " path " : fn , " url " : f " { host } /qlog.bz2 " , " headers " : { } } . items ( ) < = resp [ ' items ' ] [ 0 ] . items ( )
assert resp [ ' items ' ] [ 0 ] . get ( ' id ' ) is not None
assert athenad . upload_queue . qsize ( ) == 1
def test_upload_file_to_url_duplicate ( self , host ) :
self . _create_file ( ' qlog.bz2 ' )
url1 = f " { host } /qlog.bz2?sig=sig1 "
@ -214,14 +213,12 @@ class TestAthenadMethods(unittest.TestCase):
# Upload same file again, but with different signature
url2 = f " { host } /qlog.bz2?sig=sig2 "
resp = dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " , url2 , { } )
self . assertEqual ( resp , { ' enqueued ' : 0 , ' items ' : [ ] } )
assert resp == { ' enqueued ' : 0 , ' items ' : [ ] }
@with_mock_athena
def test_uploadFileToUrl_does_not_exist ( self , host ) :
def test_upload_file_to_url_does_not_exist ( self , host ) :
not_exists_resp = dispatcher [ " uploadFileToUrl " ] ( " does_not_exist.bz2 " , " http://localhost:1238 " , { } )
self . assertEqual ( not_exists_resp , { ' enqueued ' : 0 , ' items ' : [ ] , ' failed ' : [ ' does_not_exist.bz2 ' ] } )
assert not_exists_resp == { ' enqueued ' : 0 , ' items ' : [ ] , ' failed ' : [ ' does_not_exist.bz2 ' ] }
@with_mock_athena
@with_upload_handler
def test_upload_handler ( self , host ) :
fn = self . _create_file ( ' qlog.bz2 ' )
@ -233,13 +230,12 @@ class TestAthenadMethods(unittest.TestCase):
# TODO: verify that upload actually succeeded
# TODO: also check that end_event and metered network raises AbortTransferException
self . assertEqual ( athenad . upload_queue . qsize ( ) , 0 )
assert athenad . upload_queue . qsize ( ) == 0
@parameterized . expand ( [ ( 500 , True ) , ( 412 , False ) ] )
@with_mock_athena
@mock . patch ( ' requests.put ' )
@pytest . mark . parametrize ( " status,retry " , [ ( 500 , True ) , ( 412 , False ) ] )
@with_upload_handler
def test_upload_handler_retry ( self , status , retry , mock_put , host ) :
def test_upload_handler_retry ( self , mocker , host , status , retry ) :
mock_put = mocker . patch ( ' requests.put ' )
mock_put . return_value . status_code = status
fn = self . _create_file ( ' qlog.bz2 ' )
item = athenad . UploadItem ( path = fn , url = f " { host } /qlog.bz2 " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
@ -248,10 +244,10 @@ class TestAthenadMethods(unittest.TestCase):
self . _wait_for_upload ( )
time . sleep ( 0.1 )
self . assertEqual ( athenad . upload_queue . qsize ( ) , 1 if retry else 0 )
assert athenad . upload_queue . qsize ( ) == ( 1 if retry else 0 )
if retry :
self . assertEqual ( athenad . upload_queue . get ( ) . retry_count , 1 )
assert athenad . upload_queue . get ( ) . retry_count == 1
@with_upload_handler
def test_upload_handler_timeout ( self ) :
@ -265,33 +261,33 @@ class TestAthenadMethods(unittest.TestCase):
time . sleep ( 0.1 )
# Check that upload with retry count exceeded is not put back
self . assertEqual ( athenad . upload_queue . qsize ( ) , 0 )
assert athenad . upload_queue . qsize ( ) == 0
athenad . upload_queue . put_nowait ( item )
self . _wait_for_upload ( )
time . sleep ( 0.1 )
# Check that upload item was put back in the queue with incremented retry count
self . assertEqual ( athenad . upload_queue . qsize ( ) , 1 )
self . assertEqual ( athenad . upload_queue . get ( ) . retry_count , 1 )
assert athenad . upload_queue . qsize ( ) == 1
assert athenad . upload_queue . get ( ) . retry_count == 1
@with_upload_handler
def test_cancelU pload ( self ) :
def test_cancel_u pload ( self ) :
item = athenad . UploadItem ( path = " qlog.bz2 " , url = " http://localhost:44444/qlog.bz2 " , headers = { } ,
created_at = int ( time . time ( ) * 1000 ) , id = ' id ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
dispatcher [ " cancelUpload " ] ( item . id )
self . assertIn ( item . id , athenad . cancelled_uploads )
assert item . id in athenad . cancelled_uploads
self . _wait_for_upload ( )
time . sleep ( 0.1 )
self . assertEqual ( athenad . upload_queue . qsize ( ) , 0 )
self . assertEqual ( len ( athenad . cancelled_uploads ) , 0 )
assert athenad . upload_queue . qsize ( ) == 0
assert len ( athenad . cancelled_uploads ) == 0
@with_upload_handler
def test_cancelE xpiry ( self ) :
def test_cancel_e xpiry ( self ) :
t_future = datetime . now ( ) - timedelta ( days = 40 )
ts = int ( t_future . strftime ( " %s " ) ) * 1000
@ -303,15 +299,14 @@ class TestAthenadMethods(unittest.TestCase):
self . _wait_for_upload ( )
time . sleep ( 0.1 )
self . assertEqual ( athenad . upload_queue . qsize ( ) , 0 )
assert athenad . upload_queue . qsize ( ) == 0
def test_listUploadQueueE mpty ( self ) :
def test_list_upload_queue_e mpty ( self ) :
items = dispatcher [ " listUploadQueue " ] ( )
self . assertEqual ( len ( items ) , 0 )
assert len ( items ) == 0
@with_http_server
@with_upload_handler
def test_listUploadQueueC urrent ( self , host : str ) :
def test_list_upload_queue_c urrent ( self , host : str ) :
fn = self . _create_file ( ' qlog.bz2 ' )
item = athenad . UploadItem ( path = fn , url = f " { host } /qlog.bz2 " , headers = { } , created_at = int ( time . time ( ) * 1000 ) , id = ' ' , allow_cellular = True )
@ -319,22 +314,22 @@ class TestAthenadMethods(unittest.TestCase):
self . _wait_for_upload ( )
items = dispatcher [ " listUploadQueue " ] ( )
self . assertEqual ( len ( items ) , 1 )
self . assertTrue ( items [ 0 ] [ ' current ' ] )
assert len ( items ) == 1
assert items [ 0 ] [ ' current ' ]
def test_listUploadQ ueue ( self ) :
def test_list_upload_q ueue ( self ) :
item = athenad . UploadItem ( path = " qlog.bz2 " , url = " http://localhost:44444/qlog.bz2 " , headers = { } ,
created_at = int ( time . time ( ) * 1000 ) , id = ' id ' , allow_cellular = True )
athenad . upload_queue . put_nowait ( item )
items = dispatcher [ " listUploadQueue " ] ( )
self . assertEqual ( len ( items ) , 1 )
self . assertDictEqual ( items [ 0 ] , asdict ( item ) )
self . assertFalse ( items [ 0 ] [ ' current ' ] )
assert len ( items ) == 1
assert items [ 0 ] == asdict ( item )
assert not items [ 0 ] [ ' current ' ]
athenad . cancelled_uploads . add ( item . id )
items = dispatcher [ " listUploadQueue " ] ( )
self . assertEqual ( len ( items ) , 0 )
assert len ( items ) == 0
def test_upload_queue_persistence ( self ) :
item1 = athenad . UploadItem ( path = " _ " , url = " _ " , headers = { } , created_at = int ( time . time ( ) ) , id = ' id1 ' )
@ -353,11 +348,10 @@ class TestAthenadMethods(unittest.TestCase):
athenad . upload_queue . queue . clear ( )
athenad . UploadQueueCache . initialize ( athenad . upload_queue )
self . assertEqual ( athenad . upload_queue . qsize ( ) , 1 )
self . assertDictEqual ( asdict ( athenad . upload_queue . queue [ - 1 ] ) , asdict ( item1 ) )
assert athenad . upload_queue . qsize ( ) == 1
assert asdict ( athenad . upload_queue . queue [ - 1 ] ) == asdict ( item1 )
@mock . patch ( ' openpilot.selfdrive.athena.athenad.create_connection ' )
def test_startLocalProxy ( self , mock_create_connection ) :
def test_start_local_proxy ( self , mock_create_connection ) :
end_event = threading . Event ( )
ws_recv = queue . Queue ( )
@ -380,21 +374,21 @@ class TestAthenadMethods(unittest.TestCase):
ws_recv . put_nowait ( WebSocketConnectionClosedException ( ) )
socket_thread . join ( )
def test_getSshAuthorizedK eys ( self ) :
def test_get_ssh_authorized_k eys ( self ) :
keys = dispatcher [ " getSshAuthorizedKeys " ] ( )
self . assertEqual ( keys , self . default_params [ " GithubSshKeys " ] . decode ( ' utf-8 ' ) )
assert keys == self . default_params [ " GithubSshKeys " ] . decode ( ' utf-8 ' )
def test_getGithubU sername ( self ) :
def test_get_github_u sername ( self ) :
keys = dispatcher [ " getGithubUsername " ] ( )
self . assertEqual ( keys , self . default_params [ " GithubUsername " ] . decode ( ' utf-8 ' ) )
assert keys == self . default_params [ " GithubUsername " ] . decode ( ' utf-8 ' )
def test_getV ersion ( self ) :
def test_get_v ersion ( self ) :
resp = dispatcher [ " getVersion " ] ( )
keys = [ " version " , " remote " , " branch " , " commit " ]
self . assertEqual ( list ( resp . keys ( ) ) , keys )
assert list ( resp . keys ( ) ) == keys
for k in keys :
self . assertIsI nstance( resp [ k ] , str , f " { k } is not a string " )
self . assertTrue ( len ( resp [ k ] ) > 0 , f " { k } has no value " )
assert isi nstance( resp [ k ] , str ) , f " { k } is not a string "
assert len ( resp [ k ] ) > 0 , f " { k } has no value "
def test_jsonrpc_handler ( self ) :
end_event = threading . Event ( )
@ -405,15 +399,15 @@ class TestAthenadMethods(unittest.TestCase):
# with params
athenad . recv_queue . put_nowait ( json . dumps ( { " method " : " echo " , " params " : [ " hello " ] , " jsonrpc " : " 2.0 " , " id " : 0 } ) )
resp = athenad . send_queue . get ( timeout = 3 )
self . assertDictEqual ( json . loads ( resp ) , { ' result ' : ' hello ' , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' } )
assert json . loads ( resp ) == { ' result ' : ' hello ' , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
# without params
athenad . recv_queue . put_nowait ( json . dumps ( { " method " : " getNetworkType " , " jsonrpc " : " 2.0 " , " id " : 0 } ) )
resp = athenad . send_queue . get ( timeout = 3 )
self . assertDictEqual ( json . loads ( resp ) , { ' result ' : 1 , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' } )
assert json . loads ( resp ) == { ' result ' : 1 , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
# log forwarding
athenad . recv_queue . put_nowait ( json . dumps ( { ' result ' : { ' success ' : 1 } , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' } ) )
resp = athenad . log_recv_queue . get ( timeout = 3 )
self . assertDictEqual ( json . loads ( resp ) , { ' result ' : { ' success ' : 1 } , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' } )
assert json . loads ( resp ) == { ' result ' : { ' success ' : 1 } , ' id ' : 0 , ' jsonrpc ' : ' 2.0 ' }
finally :
end_event . set ( )
thread . join ( )
@ -427,8 +421,4 @@ class TestAthenadMethods(unittest.TestCase):
# ensure the list is all logs except most recent
sl = athenad . get_logs_to_send_sorted ( )
self . assertListEqual ( sl , fl [ : - 1 ] )
if __name__ == ' __main__ ' :
unittest . main ( )
assert sl == fl [ : - 1 ]