athena: lower upload DHCP priority (#35648)

* try this

* draft

* works

* caps

* should only need https

* fix test
pull/35613/head
Shane Smiskol 2 weeks ago committed by GitHub
parent 684f770435
commit 2b8a956f41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 28
      system/athena/athenad.py
  2. 6
      system/athena/tests/test_athenad.py

@ -22,6 +22,7 @@ from typing import cast
from collections.abc import Callable
import requests
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from jsonrpc import JSONRPCResponseManager, dispatcher
from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException,
create_connection)
@ -55,6 +56,11 @@ WS_FRAME_SIZE = 4096
DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds
DEFAULT_UPLOAD_PRIORITY = 99 # higher number = lower priority
# https://bytesolutions.com/dscp-tos-cos-precedence-conversion-chart,
# https://en.wikipedia.org/wiki/Differentiated_services
UPLOAD_TOS = 0x20 # CS1, low priority background traffic
SSH_TOS = 0x90 # AF42, DSCP of 36/HDD_LINUX_AC_VI with the minimum delay flag
NetworkType = log.DeviceState.NetworkType
UploadFileDict = dict[str, str | int | float | bool]
@ -63,6 +69,17 @@ UploadItemDict = dict[str, str | bool | int | float | dict[str, str]]
UploadFilesToUrlResponse = dict[str, int | list[UploadItemDict] | list[str]]
class UploadTOSAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
pool_kwargs["socket_options"] = [(socket.IPPROTO_IP, socket.IP_TOS, UPLOAD_TOS)]
super().init_poolmanager(connections, maxsize, block, **pool_kwargs)
UPLOAD_SESS = requests.Session()
UPLOAD_SESS.mount("http://", UploadTOSAdapter())
UPLOAD_SESS.mount("https://", UploadTOSAdapter())
@dataclass
class UploadFile:
fn: str
@ -309,10 +326,10 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
stream = None
try:
stream, content_length = get_upload_stream(path, compress)
response = requests.put(upload_item.url,
data=CallbackReader(stream, callback, content_length) if callback else stream,
headers={**upload_item.headers, 'Content-Length': str(content_length)},
timeout=30)
response = UPLOAD_SESS.put(upload_item.url,
data=CallbackReader(stream, callback, content_length) if callback else stream,
headers={**upload_item.headers, 'Content-Length': str(content_length)},
timeout=30)
return response
finally:
if stream:
@ -482,8 +499,7 @@ def startLocalProxy(global_end_event: threading.Event, remote_ws_uri: str, local
enable_multithread=True)
# Set TOS to keep connection responsive while under load.
# DSCP of 36/HDD_LINUX_AC_VI with the minimum delay flag
ws.sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 0x90)
ws.sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, SSH_TOS)
ssock, csock = socket.socketpair()
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

@ -19,7 +19,7 @@ from cereal import messaging
from openpilot.common.params import Params
from openpilot.common.timeout import Timeout
from openpilot.system.athena import athenad
from openpilot.system.athena.athenad import MAX_RETRY_COUNT, dispatcher
from openpilot.system.athena.athenad import MAX_RETRY_COUNT, UPLOAD_SESS, dispatcher
from openpilot.system.athena.tests.helpers import HTTPRequestHandler, MockWebsocket, MockApi, EchoSocket
from openpilot.selfdrive.test.helpers import http_server_context
from openpilot.system.hardware.hw import Paths
@ -29,7 +29,7 @@ def seed_athena_server(host, port):
with Timeout(2, 'HTTP Server seeding failed'):
while True:
try:
requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10)
UPLOAD_SESS.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10)
break
except requests.exceptions.ConnectionError:
time.sleep(0.1)
@ -239,7 +239,7 @@ class TestAthenadMethods:
@pytest.mark.parametrize("status,retry", [(500,True), (412,False)])
@with_upload_handler
def test_upload_handler_retry(self, mocker, host, status, retry):
mock_put = mocker.patch('requests.put')
mock_put = mocker.patch('openpilot.system.athena.athenad.UPLOAD_SESS.put')
mock_put.return_value.__enter__.return_value.status_code = status
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

Loading…
Cancel
Save