diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index 3ae95430e2..cc7cdd87ad 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -4,37 +4,39 @@ import hashlib import io import json import os -import sys import queue import random import select import socket +import subprocess +import sys +import tempfile import threading import time -import tempfile -import subprocess from collections import namedtuple +from datetime import datetime from functools import partial from typing import Any, Dict import requests from jsonrpc import JSONRPCResponseManager, dispatcher -from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection +from websocket import (ABNF, WebSocketException, WebSocketTimeoutException, + create_connection) import cereal.messaging as messaging from cereal import log from cereal.services import service_list from common.api import Api -from common.file_helpers import CallbackReader from common.basedir import PERSIST +from common.file_helpers import CallbackReader from common.params import Params from common.realtime import sec_since_boot from selfdrive.hardware import HARDWARE, PC, TICI from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.xattr_cache import getxattr, setxattr -from selfdrive.swaglog import cloudlog, SWAGLOG_DIR -from selfdrive.version import get_version, get_origin, get_short_branch, get_commit from selfdrive.statsd import STATS_DIR +from selfdrive.swaglog import SWAGLOG_DIR, cloudlog +from selfdrive.version import get_commit, get_origin, get_short_branch, get_version ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) @@ -46,6 +48,7 @@ RECONNECT_TIMEOUT_S = 70 RETRY_DELAY = 10 # seconds MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately +MAX_AGE = 31 * 24 * 3600 # seconds WS_FRAME_SIZE = 4096 NetworkType = log.DeviceState.NetworkType @@ -170,7 +173,11 @@ def upload_handler(end_event: threading.Event) -> None: cancelled_uploads.remove(cur_upload_items[tid].id) continue - # TODO: remove item if too old + # Remove item if too old + age = datetime.now() - datetime.fromtimestamp(cur_upload_items[tid].created_at / 1000) + if age.total_seconds() > MAX_AGE: + cloudlog.event("athena.upload_handler.expired", item=cur_upload_items[tid], error=True) + continue # Check if uploading over cell is allowed sm.update(0) @@ -457,7 +464,7 @@ def getNetworks(): @dispatcher.add_method def takeSnapshot(): - from selfdrive.camerad.snapshot.snapshot import snapshot, jpeg_write + from selfdrive.camerad.snapshot.snapshot import jpeg_write, snapshot ret = snapshot() if ret is not None: def b64jpeg(x): diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index d59876f7ab..1742ed4347 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -8,6 +8,7 @@ import time import threading import queue import unittest +from datetime import datetime, timedelta from multiprocessing import Process from pathlib import Path @@ -240,6 +241,28 @@ class TestAthenadMethods(unittest.TestCase): finally: end_event.set() + def test_cancelExpiry(self): + t_future = datetime.now() - timedelta(days=40) + ts = int(t_future.strftime("%s")) * 1000 + + # Item that would time out if actually uploaded + fn = os.path.join(athenad.ROOT, 'qlog.bz2') + Path(fn).touch() + item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) + + + end_event = threading.Event() + thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) + thread.start() + try: + athenad.upload_queue.put_nowait(item) + self.wait_for_upload() + time.sleep(0.1) + + self.assertEqual(athenad.upload_queue.qsize(), 0) + finally: + end_event.set() + def test_listUploadQueueEmpty(self): items = dispatcher["listUploadQueue"]() self.assertEqual(len(items), 0)