athena fixups (#19791)

* fix leak

* assert raises

* no q

* guess the queue did something
old-commit-hash: 7c82bac27e
commatwo_master
Adeeb Shihadeh 4 years ago committed by GitHub
parent 6a01983000
commit ebce6a6f56
  1. 2
      selfdrive/athena/athenad.py
  2. 49
      selfdrive/athena/tests/helpers.py
  3. 4
      selfdrive/athena/tests/test_athenad.py

@ -280,6 +280,8 @@ def ws_proxy_send(ws, local_sock, signal_sock, end_event):
cloudlog.exception("athenad.ws_proxy_send.exception") cloudlog.exception("athenad.ws_proxy_send.exception")
end_event.set() end_event.set()
signal_sock.close()
def ws_recv(ws, end_event): def ws_recv(ws, end_event):
while not end_event.is_set(): while not end_event.is_set():

@ -1,6 +1,4 @@
import http.server import http.server
import multiprocessing
import queue
import random import random
import requests import requests
import socket import socket
@ -8,6 +6,7 @@ import time
from functools import wraps from functools import wraps
from multiprocessing import Process from multiprocessing import Process
from common.timeout import Timeout
class EchoSocket(): class EchoSocket():
def __init__(self, port): def __init__(self, port):
@ -79,41 +78,27 @@ class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
self.end_headers() self.end_headers()
def http_server(port_queue, **kwargs):
while 1:
try:
port = random.randrange(40000, 50000)
port_queue.put(port)
http.server.test(**kwargs, port=port)
except OSError as e:
if e.errno == 98:
continue
def with_http_server(func): def with_http_server(func):
@wraps(func) @wraps(func)
def inner(*args, **kwargs): def inner(*args, **kwargs):
port_queue = multiprocessing.Queue() with Timeout(2, 'HTTP Server did not start'):
host = '127.0.0.1' p = None
p = Process(target=http_server, host = '127.0.0.1'
args=(port_queue,), while p is None or p.exitcode is not None:
kwargs={ port = random.randrange(40000, 50000)
'HandlerClass': HTTPRequestHandler, p = Process(target=http.server.test,
'bind': host}) kwargs={'port': port, 'HandlerClass': HTTPRequestHandler, 'bind': host})
p.start() p.start()
start = time.monotonic()
port = None
while 1:
if time.monotonic() - start > 10:
raise Exception('HTTP Server did not start')
try:
port = port_queue.get(timeout=0.1)
requests.put(f'http://{host}:{port}/qlog.bz2', data='')
break
except (requests.exceptions.ConnectionError, queue.Empty):
time.sleep(0.1) time.sleep(0.1)
with Timeout(2):
while True:
try:
requests.put(f'http://{host}:{port}/qlog.bz2', data='')
break
except requests.exceptions.ConnectionError:
time.sleep(0.1)
try: try:
return func(*args, f'http://{host}:{port}', **kwargs) return func(*args, f'http://{host}:{port}', **kwargs)
finally: finally:

@ -64,10 +64,8 @@ class TestAthenadMethods(unittest.TestCase):
try: try:
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
try: with self.assertRaises(requests.exceptions.ConnectionError):
athenad._do_upload(item) athenad._do_upload(item)
except requests.exceptions.ConnectionError:
pass
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item) resp = athenad._do_upload(item)

Loading…
Cancel
Save