athena: expire items after 31 days (#23751)

* athena: expire items after 31 days

* add test
old-commit-hash: f03549c276
commatwo_master
Willem Melching 3 years ago committed by GitHub
parent a7fa889605
commit 8788555f10
  1. 25
      selfdrive/athena/athenad.py
  2. 23
      selfdrive/athena/tests/test_athenad.py

@ -4,37 +4,39 @@ import hashlib
import io import io
import json import json
import os import os
import sys
import queue import queue
import random import random
import select import select
import socket import socket
import subprocess
import sys
import tempfile
import threading import threading
import time import time
import tempfile
import subprocess
from collections import namedtuple from collections import namedtuple
from datetime import datetime
from functools import partial from functools import partial
from typing import Any, Dict from typing import Any, Dict
import requests import requests
from jsonrpc import JSONRPCResponseManager, dispatcher from jsonrpc import JSONRPCResponseManager, dispatcher
from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection from websocket import (ABNF, WebSocketException, WebSocketTimeoutException,
create_connection)
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import log from cereal import log
from cereal.services import service_list from cereal.services import service_list
from common.api import Api from common.api import Api
from common.file_helpers import CallbackReader
from common.basedir import PERSIST from common.basedir import PERSIST
from common.file_helpers import CallbackReader
from common.params import Params from common.params import Params
from common.realtime import sec_since_boot from common.realtime import sec_since_boot
from selfdrive.hardware import HARDWARE, PC, TICI from selfdrive.hardware import HARDWARE, PC, TICI
from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
from selfdrive.version import get_version, get_origin, get_short_branch, get_commit
from selfdrive.statsd import STATS_DIR from selfdrive.statsd import STATS_DIR
from selfdrive.swaglog import SWAGLOG_DIR, cloudlog
from selfdrive.version import get_commit, get_origin, get_short_branch, get_version
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
@ -46,6 +48,7 @@ RECONNECT_TIMEOUT_S = 70
RETRY_DELAY = 10 # seconds RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
MAX_AGE = 31 * 24 * 3600 # seconds
WS_FRAME_SIZE = 4096 WS_FRAME_SIZE = 4096
NetworkType = log.DeviceState.NetworkType NetworkType = log.DeviceState.NetworkType
@ -170,7 +173,11 @@ def upload_handler(end_event: threading.Event) -> None:
cancelled_uploads.remove(cur_upload_items[tid].id) cancelled_uploads.remove(cur_upload_items[tid].id)
continue continue
# TODO: remove item if too old # Remove item if too old
age = datetime.now() - datetime.fromtimestamp(cur_upload_items[tid].created_at / 1000)
if age.total_seconds() > MAX_AGE:
cloudlog.event("athena.upload_handler.expired", item=cur_upload_items[tid], error=True)
continue
# Check if uploading over cell is allowed # Check if uploading over cell is allowed
sm.update(0) sm.update(0)
@ -457,7 +464,7 @@ def getNetworks():
@dispatcher.add_method @dispatcher.add_method
def takeSnapshot(): def takeSnapshot():
from selfdrive.camerad.snapshot.snapshot import snapshot, jpeg_write from selfdrive.camerad.snapshot.snapshot import jpeg_write, snapshot
ret = snapshot() ret = snapshot()
if ret is not None: if ret is not None:
def b64jpeg(x): def b64jpeg(x):

@ -8,6 +8,7 @@ import time
import threading import threading
import queue import queue
import unittest import unittest
from datetime import datetime, timedelta
from multiprocessing import Process from multiprocessing import Process
from pathlib import Path from pathlib import Path
@ -240,6 +241,28 @@ class TestAthenadMethods(unittest.TestCase):
finally: finally:
end_event.set() end_event.set()
def test_cancelExpiry(self):
t_future = datetime.now() - timedelta(days=40)
ts = int(t_future.strftime("%s")) * 1000
# Item that would time out if actually uploaded
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item)
self.wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
def test_listUploadQueueEmpty(self): def test_listUploadQueueEmpty(self):
items = dispatcher["listUploadQueue"]() items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0) self.assertEqual(len(items), 0)

Loading…
Cancel
Save