diff --git a/poetry.lock b/poetry.lock index 93e907a578..a4e47d2359 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3472,8 +3472,6 @@ files = [ {file = "pygame-2.5.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e24d05184e4195fe5ebcdce8b18ecb086f00182b9ae460a86682d312ce8d31f"}, {file = "pygame-2.5.2-cp311-cp311-win32.whl", hash = "sha256:f02c1c7505af18d426d355ac9872bd5c916b27f7b0fe224749930662bea47a50"}, {file = "pygame-2.5.2-cp311-cp311-win_amd64.whl", hash = "sha256:6d58c8cf937815d3b7cdc0fa9590c5129cb2c9658b72d00e8a4568dea2ff1d42"}, - {file = "pygame-2.5.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:1a2a43802bb5e89ce2b3b775744e78db4f9a201bf8d059b946c61722840ceea8"}, - {file = "pygame-2.5.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1c289f2613c44fe70a1e40769de4a49c5ab5a29b9376f1692bb1a15c9c1c9bfa"}, {file = "pygame-2.5.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:074aa6c6e110c925f7f27f00c7733c6303407edc61d738882985091d1eb2ef17"}, {file = "pygame-2.5.2-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fe0228501ec616779a0b9c4299e837877783e18df294dd690b9ab0eed3d8aaab"}, {file = "pygame-2.5.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:31648d38ecdc2335ffc0e38fb18a84b3339730521505dac68514f83a1092e3f4"}, @@ -3613,6 +3611,17 @@ files = [ [package.dependencies] cffi = ">=1.0.0" +[[package]] +name = "pympler" +version = "1.0.1" +description = "A development tool to measure, monitor and analyze the memory behavior of Python objects." +optional = false +python-versions = ">=3.6" +files = [ + {file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"}, + {file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"}, +] + [[package]] name = "pyopencl" version = "2023.1.4" @@ -5179,4 +5188,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "~3.11" -content-hash = "acb0688e485872194c21e1313e20fc4a67084893b26e9b8cde1d66e3fdbb1282" +content-hash = "9538e574ca03437994b7b0a0b6cb41842256162a2f14abfd0da26587709f145a" diff --git a/pyproject.toml b/pyproject.toml index b87211cc92..c83669898f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,6 +140,7 @@ parameterized = "^0.8" pprofile = "*" pre-commit = "*" pygame = "*" +pympler = "*" pyprof2calltree = "*" pytest = "*" pytest-cov = "*" diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index 70e18bbedb..c93b434677 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -20,7 +20,7 @@ from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial from queue import Queue -from typing import BinaryIO, Callable, Dict, List, Optional, Set, Union, cast +from typing import Callable, Dict, List, Optional, Set, Union, cast import requests from jsonrpc import JSONRPCResponseManager, dispatcher @@ -290,19 +290,15 @@ def _do_upload(upload_item: UploadItem, callback: Optional[Callable] = None) -> compress = True with open(path, "rb") as f: - data: BinaryIO + content = f.read() if compress: cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) - compressed = bz2.compress(f.read()) - size = len(compressed) - data = io.BytesIO(compressed) - else: - size = os.fstat(f.fileno()).st_size - data = f + content = bz2.compress(content) + with io.BytesIO(content) as data: return requests.put(upload_item.url, - data=CallbackReader(data, callback, size) if callback else data, - headers={**upload_item.headers, 'Content-Length': str(size)}, + data=CallbackReader(data, callback, len(content)) if callback else data, + headers={**upload_item.headers, 'Content-Length': str(len(content))}, timeout=30) diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 27ccbdccc6..e81753a6a0 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -9,10 +9,11 @@ import queue import unittest from dataclasses import asdict, replace from datetime import datetime, timedelta +from parameterized import parameterized from typing import Optional from multiprocessing import Process -from pathlib import Path +from pympler.tracker import SummaryTracker from unittest import mock from websocket import ABNF from websocket._exceptions import WebSocketConnectionClosedException @@ -57,10 +58,11 @@ class TestAthenadMethods(unittest.TestCase): break @staticmethod - def _create_file(file: str, parent: Optional[str] = None) -> str: + def _create_file(file: str, parent: Optional[str] = None, data: bytes = b'') -> str: fn = os.path.join(Paths.log_root() if parent is None else parent, file) os.makedirs(os.path.dirname(fn), exist_ok=True) - Path(fn).touch() + with open(fn, 'wb') as f: + f.write(data) return fn @@ -137,19 +139,31 @@ class TestAthenadMethods(unittest.TestCase): if fn.endswith('.bz2'): self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) - + @parameterized.expand([(True,), (False,)]) @with_http_server - def test_do_upload(self, host): - fn = self._create_file('qlog.bz2') + def test_do_upload(self, compress, host): + # random bytes to ensure rather large object post-compression + fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) - item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') + # warm up object tracker + tracker = SummaryTracker() + for _ in range(5): + tracker.diff() + + upload_fn = fn + ('.bz2' if compress else '') + item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with self.assertRaises(requests.exceptions.ConnectionError): athenad._do_upload(item) - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') resp = athenad._do_upload(item) self.assertEqual(resp.status_code, 201) + # assert memory cleaned up + for _type, num_objects, total_size in tracker.diff(): + with self.subTest(_type=_type): + self.assertLess(total_size / 1024, 10, f'Object {_type} ({num_objects=}) grew larger than 10 kB while uploading file') + @with_http_server def test_uploadFileToUrl(self, host): fn = self._create_file('qlog.bz2')