diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index a53d5778d4..35a9eaf192 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -42,6 +42,10 @@ from selfdrive.statsd import STATS_DIR from system.swaglog import SWAGLOG_DIR, cloudlog from system.version import get_commit, get_origin, get_short_branch, get_version + +# missing in pysocket +TCP_USER_TIMEOUT = 18 + ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) LOCAL_PORT_WHITELIST = {8022} @@ -141,6 +145,7 @@ def handle_long_poll(ws: WebSocket, exit_event: Optional[threading.Event]) -> No end_event = threading.Event() threads = [ + threading.Thread(target=ws_manage, args=(ws, end_event), name='ws_manage'), threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'), threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'), threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), @@ -154,8 +159,7 @@ def handle_long_poll(ws: WebSocket, exit_event: Optional[threading.Event]) -> No for thread in threads: thread.start() try: - while not end_event.is_set(): - time.sleep(0.1) + while not end_event.wait(0.1): if exit_event is not None and exit_event.is_set(): end_event.set() except (KeyboardInterrupt, SystemExit): @@ -756,6 +760,25 @@ def ws_send(ws: WebSocket, end_event: threading.Event) -> None: end_event.set() +def ws_manage(ws: WebSocket, end_event: threading.Event) -> None: + params = Params() + onroad_prev = None + sock = ws.sock + + while True: + onroad = params.get_bool("IsOnroad") + if onroad != onroad_prev: + onroad_prev = onroad + + sock.setsockopt(socket.IPPROTO_TCP, TCP_USER_TIMEOUT, 16000 if onroad else 0) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 7 if onroad else 10) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 if onroad else 3) + + if end_event.wait(5): + break + + def backoff(retries: int) -> int: return random.randrange(0, min(128, int(2 ** retries))) diff --git a/selfdrive/athena/tests/test_athenad_ping.py b/selfdrive/athena/tests/test_athenad_ping.py index d85ac60897..f768fbebaa 100755 --- a/selfdrive/athena/tests/test_athenad_ping.py +++ b/selfdrive/athena/tests/test_athenad_ping.py @@ -37,6 +37,9 @@ class TestAthenadPing(unittest.TestCase): def _received_ping(self) -> bool: return self._get_ping_time() is not None + def _set_onroad(self, onroad: bool) -> None: + self.params.put_bool("IsOnroad", onroad) + @classmethod def setUpClass(cls) -> None: cls.params = Params() @@ -63,8 +66,7 @@ class TestAthenadPing(unittest.TestCase): self.exit_event.set() self.athenad.join() - @unittest.skipIf(not TICI, "only run on desk") - def test_timeout(self) -> None: + def assertTimeout(self, reconnect_time: float) -> None: self.athenad.start() time.sleep(1) @@ -84,7 +86,7 @@ class TestAthenadPing(unittest.TestCase): wifi_radio(False) print("waiting for reconnect attempt") start_time = time.monotonic() - with Timeout(180, "no reconnect attempt"): + with Timeout(reconnect_time, "no reconnect attempt"): while not athenad.create_connection.called: time.sleep(0.1) print(f"reconnect attempt after {time.monotonic() - start_time:.2f}s") @@ -97,6 +99,16 @@ class TestAthenadPing(unittest.TestCase): time.sleep(0.1) print("ping received") + @unittest.skipIf(not TICI, "only run on desk") + def test_offroad(self) -> None: + self._set_onroad(False) + self.assertTimeout(100) # expect approx 90s + + @unittest.skipIf(not TICI, "only run on desk") + def test_onroad(self) -> None: + self._set_onroad(True) + self.assertTimeout(30) # expect 20-30s + if __name__ == "__main__": unittest.main()