|  |  | @ -1,4 +1,5 @@ | 
			
		
	
		
		
			
				
					
					|  |  |  | #!/usr/bin/env python3 |  |  |  | #!/usr/bin/env python3 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from functools import partial | 
			
		
	
		
		
			
				
					
					|  |  |  | import json |  |  |  | import json | 
			
		
	
		
		
			
				
					
					|  |  |  | import multiprocessing |  |  |  | import multiprocessing | 
			
		
	
		
		
			
				
					
					|  |  |  | import os |  |  |  | import os | 
			
		
	
	
		
		
			
				
					|  |  | @ -17,11 +18,27 @@ from unittest import mock | 
			
		
	
		
		
			
				
					
					|  |  |  | from websocket import ABNF |  |  |  | from websocket import ABNF | 
			
		
	
		
		
			
				
					
					|  |  |  | from websocket._exceptions import WebSocketConnectionClosedException |  |  |  | from websocket._exceptions import WebSocketConnectionClosedException | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from cereal import messaging | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from openpilot.common.timeout import Timeout | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.athena import athenad |  |  |  | from openpilot.selfdrive.athena import athenad | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher |  |  |  | from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server |  |  |  | from openpilot.selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server | 
			
		
	
		
		
			
				
					
					|  |  |  | from cereal import messaging |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.system.hardware.hw import Paths |  |  |  | from openpilot.system.hardware.hw import Paths | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from openpilot.selfdrive.athena.tests.helpers import HTTPRequestHandler | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | def seed_athena_server(host, port): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   with Timeout(2, 'HTTP Server seeding failed'): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     while True: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       try: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         break | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       except requests.exceptions.ConnectionError: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         time.sleep(0.1) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | with_mock_athena = partial(with_http_server, handler=HTTPRequestHandler, setup=seed_athena_server) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | class TestAthenadMethods(unittest.TestCase): |  |  |  | class TestAthenadMethods(unittest.TestCase): | 
			
		
	
	
		
		
			
				
					|  |  | @ -138,7 +155,7 @@ class TestAthenadMethods(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |       self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) |  |  |  |       self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @parameterized.expand([(True,), (False,)]) |  |  |  |   @parameterized.expand([(True,), (False,)]) | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   def test_do_upload(self, compress, host): |  |  |  |   def test_do_upload(self, compress, host): | 
			
		
	
		
		
			
				
					
					|  |  |  |     # random bytes to ensure rather large object post-compression |  |  |  |     # random bytes to ensure rather large object post-compression | 
			
		
	
		
		
			
				
					
					|  |  |  |     fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) |  |  |  |     fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) | 
			
		
	
	
		
		
			
				
					|  |  | @ -152,7 +169,7 @@ class TestAthenadMethods(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     resp = athenad._do_upload(item) |  |  |  |     resp = athenad._do_upload(item) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertEqual(resp.status_code, 201) |  |  |  |     self.assertEqual(resp.status_code, 201) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   def test_uploadFileToUrl(self, host): |  |  |  |   def test_uploadFileToUrl(self, host): | 
			
		
	
		
		
			
				
					
					|  |  |  |     fn = self._create_file('qlog.bz2') |  |  |  |     fn = self._create_file('qlog.bz2') | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -163,7 +180,7 @@ class TestAthenadMethods(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertIsNotNone(resp['items'][0].get('id')) |  |  |  |     self.assertIsNotNone(resp['items'][0].get('id')) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertEqual(athenad.upload_queue.qsize(), 1) |  |  |  |     self.assertEqual(athenad.upload_queue.qsize(), 1) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   def test_uploadFileToUrl_duplicate(self, host): |  |  |  |   def test_uploadFileToUrl_duplicate(self, host): | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._create_file('qlog.bz2') |  |  |  |     self._create_file('qlog.bz2') | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -175,12 +192,12 @@ class TestAthenadMethods(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {}) |  |  |  |     resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {}) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertEqual(resp, {'enqueued': 0, 'items': []}) |  |  |  |     self.assertEqual(resp, {'enqueued': 0, 'items': []}) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   def test_uploadFileToUrl_does_not_exist(self, host): |  |  |  |   def test_uploadFileToUrl_does_not_exist(self, host): | 
			
		
	
		
		
			
				
					
					|  |  |  |     not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {}) |  |  |  |     not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {}) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.assertEqual(not_exists_resp, {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}) |  |  |  |     self.assertEqual(not_exists_resp, {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   def test_upload_handler(self, host): |  |  |  |   def test_upload_handler(self, host): | 
			
		
	
		
		
			
				
					
					|  |  |  |     fn = self._create_file('qlog.bz2') |  |  |  |     fn = self._create_file('qlog.bz2') | 
			
		
	
		
		
			
				
					
					|  |  |  |     item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |  |  |  |     item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) | 
			
		
	
	
		
		
			
				
					|  |  | @ -199,7 +216,7 @@ class TestAthenadMethods(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     finally: |  |  |  |     finally: | 
			
		
	
		
		
			
				
					
					|  |  |  |       end_event.set() |  |  |  |       end_event.set() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @with_http_server |  |  |  |   @with_mock_athena | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   @mock.patch('requests.put') |  |  |  |   @mock.patch('requests.put') | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_upload_handler_retry(self, host, mock_put): |  |  |  |   def test_upload_handler_retry(self, host, mock_put): | 
			
		
	
		
		
			
				
					
					|  |  |  |     for status, retry in ((500, True), (412, False)): |  |  |  |     for status, retry in ((500, True), (412, False)): | 
			
		
	
	
		
		
			
				
					|  |  | 
 |