diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 5a02d45c43..5df7eec3f0 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -22,6 +22,7 @@ from typing import cast from collections.abc import Callable import requests +from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK from jsonrpc import JSONRPCResponseManager, dispatcher from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException, create_connection) @@ -55,6 +56,11 @@ WS_FRAME_SIZE = 4096 DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds DEFAULT_UPLOAD_PRIORITY = 99 # higher number = lower priority +# https://bytesolutions.com/dscp-tos-cos-precedence-conversion-chart, +# https://en.wikipedia.org/wiki/Differentiated_services +UPLOAD_TOS = 0x20 # CS1, low priority background traffic +SSH_TOS = 0x90 # AF42, DSCP of 36/HDD_LINUX_AC_VI with the minimum delay flag + NetworkType = log.DeviceState.NetworkType UploadFileDict = dict[str, str | int | float | bool] @@ -63,6 +69,17 @@ UploadItemDict = dict[str, str | bool | int | float | dict[str, str]] UploadFilesToUrlResponse = dict[str, int | list[UploadItemDict] | list[str]] +class UploadTOSAdapter(HTTPAdapter): + def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs): + pool_kwargs["socket_options"] = [(socket.IPPROTO_IP, socket.IP_TOS, UPLOAD_TOS)] + super().init_poolmanager(connections, maxsize, block, **pool_kwargs) + + +UPLOAD_SESS = requests.Session() +UPLOAD_SESS.mount("http://", UploadTOSAdapter()) +UPLOAD_SESS.mount("https://", UploadTOSAdapter()) + + @dataclass class UploadFile: fn: str @@ -309,10 +326,10 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R stream = None try: stream, content_length = get_upload_stream(path, compress) - response = requests.put(upload_item.url, - data=CallbackReader(stream, callback, content_length) if callback else stream, - headers={**upload_item.headers, 'Content-Length': str(content_length)}, - timeout=30) + response = UPLOAD_SESS.put(upload_item.url, + data=CallbackReader(stream, callback, content_length) if callback else stream, + headers={**upload_item.headers, 'Content-Length': str(content_length)}, + timeout=30) return response finally: if stream: @@ -482,8 +499,7 @@ def startLocalProxy(global_end_event: threading.Event, remote_ws_uri: str, local enable_multithread=True) # Set TOS to keep connection responsive while under load. - # DSCP of 36/HDD_LINUX_AC_VI with the minimum delay flag - ws.sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 0x90) + ws.sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, SSH_TOS) ssock, csock = socket.socketpair() local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index dd82325815..e559e0c05c 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -19,7 +19,7 @@ from cereal import messaging from openpilot.common.params import Params from openpilot.common.timeout import Timeout from openpilot.system.athena import athenad -from openpilot.system.athena.athenad import MAX_RETRY_COUNT, dispatcher +from openpilot.system.athena.athenad import MAX_RETRY_COUNT, UPLOAD_SESS, dispatcher from openpilot.system.athena.tests.helpers import HTTPRequestHandler, MockWebsocket, MockApi, EchoSocket from openpilot.selfdrive.test.helpers import http_server_context from openpilot.system.hardware.hw import Paths @@ -29,7 +29,7 @@ def seed_athena_server(host, port): with Timeout(2, 'HTTP Server seeding failed'): while True: try: - requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10) + UPLOAD_SESS.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10) break except requests.exceptions.ConnectionError: time.sleep(0.1) @@ -239,7 +239,7 @@ class TestAthenadMethods: @pytest.mark.parametrize("status,retry", [(500,True), (412,False)]) @with_upload_handler def test_upload_handler_retry(self, mocker, host, status, retry): - mock_put = mocker.patch('requests.put') + mock_put = mocker.patch('openpilot.system.athena.athenad.UPLOAD_SESS.put') mock_put.return_value.__enter__.return_value.status_code = status fn = self._create_file('qlog.zst') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)