diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index cc3ab5e95b..5089e1eb57 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -209,12 +209,14 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None: # Abort transfer if connection changed to metered after starting upload # or if athenad is shutting down to re-connect the websocket + print('callback!!!', cur / sz if sz else 1) sm.update(0) metered = sm['deviceState'].networkMetered if metered and (not item.allow_cellular): raise AbortTransferException if end_event.is_set(): + print('end event set, quitting uploading!!!') raise AbortTransferException cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) @@ -225,10 +227,12 @@ def upload_handler(end_event: threading.Event) -> None: tid = threading.get_ident() while not end_event.is_set(): + print('here!!!') cur_upload_items[tid] = None try: cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) + print('at top again', item) if item.id in cancelled_uploads: cancelled_uploads.remove(item.id) @@ -257,14 +261,19 @@ def upload_handler(end_event: threading.Event) -> None: cloudlog.event("athena.upload_handler.upload_start", fn=fn, sz=sz, network_type=network_type, metered=metered, retry_count=item.retry_count) response = _do_upload(item, partial(cb, sm, item, tid, end_event)) + print('response!', response) if response.status_code not in (200, 201, 401, 403, 412): cloudlog.event("athena.upload_handler.retry", status_code=response.status_code, fn=fn, sz=sz, network_type=network_type, metered=metered) retry_upload(tid, end_event) else: + print('success!!!') cloudlog.event("athena.upload_handler.success", fn=fn, sz=sz, network_type=network_type, metered=metered) + print('success 2!!!') + print('caching!!!') UploadQueueCache.cache(upload_queue) + print('cached!!!') except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError): cloudlog.event("athena.upload_handler.timeout", fn=fn, sz=sz, network_type=network_type, metered=metered) retry_upload(tid, end_event) diff --git a/selfdrive/athena/tests/helpers.py b/selfdrive/athena/tests/helpers.py index 87202665aa..115bfbe684 100644 --- a/selfdrive/athena/tests/helpers.py +++ b/selfdrive/athena/tests/helpers.py @@ -2,6 +2,7 @@ import http.server import threading import socket from functools import wraps +import time class MockResponse: @@ -61,8 +62,21 @@ class MockWebsocket(): class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler): def do_PUT(self): + print('do_PUT') length = int(self.headers['Content-Length']) - self.rfile.read(length) + should_delay = self.headers.get('X-Delay-Upload') == 'true' + if not should_delay: + self.rfile.read(length) + else: + # time.sleep(10) + data = self.rfile.read(16 * 1024) + while data: + # for i in range(length): + data = self.rfile.read(16 * 1024) + print('reading') + time.sleep(0.01) + + print('after rfile') self.send_response(201, "Created") self.end_headers() diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 2fecab1b1b..972f62a8a8 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -23,7 +23,7 @@ from cereal import messaging from openpilot.common.params import Params from openpilot.common.timeout import Timeout from openpilot.selfdrive.athena import athenad -from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher +from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, cb, dispatcher from openpilot.selfdrive.athena.tests.helpers import MockWebsocket, MockApi, EchoSocket, with_http_server from openpilot.system.hardware.hw import Paths from openpilot.selfdrive.athena.tests.helpers import HTTPRequestHandler @@ -49,7 +49,7 @@ def with_upload_handler(func): thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() try: - return func(*args, **kwargs) + return func(*args, thread, end_event, **kwargs) finally: end_event.set() thread.join() @@ -93,7 +93,10 @@ class TestAthenadMethods(unittest.TestCase): def _wait_for_upload(): now = time.time() while time.time() - now < 5: - if athenad.upload_queue.qsize() == 0: + print('we are here', time.time() - now, athenad.upload_queue.qsize(), athenad.cur_upload_items) + time.sleep(0.1) + if athenad.upload_queue.qsize() == 0 and list(athenad.cur_upload_items.values())[0] is None: + print('BREAKING') break @staticmethod @@ -224,56 +227,91 @@ class TestAthenadMethods(unittest.TestCase): @with_mock_athena @with_upload_handler - def test_upload_handler(self, host): - fn = self._create_file('qlog.bz2') + # @mock.patch('openpilot.selfdrive.athena.athenad.cb', new_callable=lambda: mock.MagicMock(wraps=athenad.cb)) + @mock.patch('openpilot.selfdrive.athena.athenad.cb', autospec=True) + def test_upload_handler(self, host, thread, end_event, mock_cb): + print((host, end_event, mock_cb)) + tid = list(athenad.cur_upload_items)[0] + print(tid) + print('START WITH_UPLOAD_HANDLER TEST') + + slept = False + + def monitor_cb(*args, **kwargs): + nonlocal slept + print('monitor_cb', athenad.cur_upload_items) + _item = athenad.cur_upload_items[tid] + print('PROGRESS!', _item.progress) + if _item.progress > 0.2 and not slept: + print('SLEEPING 5s!') + end_event.set() + time.sleep(1) + slept = True + cb(*args, **kwargs) + + mock_cb.side_effect = monitor_cb + + fn = self._create_file('qlog.bz2', data=os.urandom(2 * 1024 * 1024)) + # item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={'X-Delay-Upload': 'true'}, created_at=int(time.time()*1000), id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + # print(item) - athenad.upload_queue.put_nowait(item) - self._wait_for_upload() - time.sleep(0.1) - - # TODO: verify that upload actually succeeded - self.assertEqual(athenad.upload_queue.qsize(), 0) - - @parameterized.expand([(500, True), (412, False)]) - @with_mock_athena - @mock.patch('requests.put') - @with_upload_handler - def test_upload_handler_retry(self, status, retry, mock_put, host): - mock_put.return_value.status_code = status - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) + # while 1: + # time.sleep(0.01) + # print(athenad.upload_queue.qsize(), athenad.cur_upload_items) + # print() self._wait_for_upload() time.sleep(0.1) + time.sleep(2) + print('call count', mock_cb.call_count, len(athenad.cur_upload_items)) + print('is alive', thread.is_alive(), athenad.cur_upload_items) + self.assertEqual(athenad.cur_upload_items[tid].progress < 0.6) - self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) - - if retry: - self.assertEqual(athenad.upload_queue.get().retry_count, 1) - - @with_upload_handler - def test_upload_handler_timeout(self): - """When an upload times out or fails to connect it should be placed back in the queue""" - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) - item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) - - athenad.upload_queue.put_nowait(item_no_retry) - self._wait_for_upload() - time.sleep(0.1) - - # Check that upload with retry count exceeded is not put back + # TODO: verify that upload actually succeeded self.assertEqual(athenad.upload_queue.qsize(), 0) - - athenad.upload_queue.put_nowait(item) - self._wait_for_upload() - time.sleep(0.1) - - # Check that upload item was put back in the queue with incremented retry count - self.assertEqual(athenad.upload_queue.qsize(), 1) - self.assertEqual(athenad.upload_queue.get().retry_count, 1) + self.assertEqual(athenad.cur_upload_items[tid], None) + + # @parameterized.expand([(500, True), (412, False)]) + # @with_mock_athena + # @mock.patch('requests.put') + # @with_upload_handler + # def test_upload_handler_retry(self, status, retry, mock_put, host): + # mock_put.return_value.status_code = status + # fn = self._create_file('qlog.bz2') + # item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + # + # athenad.upload_queue.put_nowait(item) + # self._wait_for_upload() + # time.sleep(0.1) + # + # self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) + # + # if retry: + # self.assertEqual(athenad.upload_queue.get().retry_count, 1) + + # @with_upload_handler + # def test_upload_handler_timeout(self): + # """When an upload times out or fails to connect it should be placed back in the queue""" + # fn = self._create_file('qlog.bz2') + # item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + # item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) + # + # athenad.upload_queue.put_nowait(item_no_retry) + # self._wait_for_upload() + # time.sleep(0.1) + # + # # Check that upload with retry count exceeded is not put back + # self.assertEqual(athenad.upload_queue.qsize(), 0) + # + # athenad.upload_queue.put_nowait(item) + # self._wait_for_upload() + # time.sleep(0.1) + # + # # Check that upload item was put back in the queue with incremented retry count + # self.assertEqual(athenad.upload_queue.qsize(), 1) + # self.assertEqual(athenad.upload_queue.get().retry_count, 1) @with_upload_handler def test_cancelUpload(self):