athena: expire items after 31 days (#23751)

* athena: expire items after 31 days

* add test
pull/23760/head
Willem Melching 3 years ago committed by GitHub
parent 7765bc2166
commit f03549c276
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      selfdrive/athena/athenad.py
  2. 23
      selfdrive/athena/tests/test_athenad.py

@ -4,37 +4,39 @@ import hashlib
import io
import json
import os
import sys
import queue
import random
import select
import socket
import subprocess
import sys
import tempfile
import threading
import time
import tempfile
import subprocess
from collections import namedtuple
from datetime import datetime
from functools import partial
from typing import Any, Dict
import requests
from jsonrpc import JSONRPCResponseManager, dispatcher
from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection
from websocket import (ABNF, WebSocketException, WebSocketTimeoutException,
create_connection)
import cereal.messaging as messaging
from cereal import log
from cereal.services import service_list
from common.api import Api
from common.file_helpers import CallbackReader
from common.basedir import PERSIST
from common.file_helpers import CallbackReader
from common.params import Params
from common.realtime import sec_since_boot
from selfdrive.hardware import HARDWARE, PC, TICI
from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
from selfdrive.version import get_version, get_origin, get_short_branch, get_commit
from selfdrive.statsd import STATS_DIR
from selfdrive.swaglog import SWAGLOG_DIR, cloudlog
from selfdrive.version import get_commit, get_origin, get_short_branch, get_version
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
@ -46,6 +48,7 @@ RECONNECT_TIMEOUT_S = 70
RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
MAX_AGE = 31 * 24 * 3600 # seconds
WS_FRAME_SIZE = 4096
NetworkType = log.DeviceState.NetworkType
@ -170,7 +173,11 @@ def upload_handler(end_event: threading.Event) -> None:
cancelled_uploads.remove(cur_upload_items[tid].id)
continue
# TODO: remove item if too old
# Remove item if too old
age = datetime.now() - datetime.fromtimestamp(cur_upload_items[tid].created_at / 1000)
if age.total_seconds() > MAX_AGE:
cloudlog.event("athena.upload_handler.expired", item=cur_upload_items[tid], error=True)
continue
# Check if uploading over cell is allowed
sm.update(0)
@ -457,7 +464,7 @@ def getNetworks():
@dispatcher.add_method
def takeSnapshot():
from selfdrive.camerad.snapshot.snapshot import snapshot, jpeg_write
from selfdrive.camerad.snapshot.snapshot import jpeg_write, snapshot
ret = snapshot()
if ret is not None:
def b64jpeg(x):

@ -8,6 +8,7 @@ import time
import threading
import queue
import unittest
from datetime import datetime, timedelta
from multiprocessing import Process
from pathlib import Path
@ -240,6 +241,28 @@ class TestAthenadMethods(unittest.TestCase):
finally:
end_event.set()
def test_cancelExpiry(self):
t_future = datetime.now() - timedelta(days=40)
ts = int(t_future.strftime("%s")) * 1000
# Item that would time out if actually uploaded
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item)
self.wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
def test_listUploadQueueEmpty(self):
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0)

Loading…
Cancel
Save