diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index 9d92392b3a..ea20612c16 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -209,6 +209,7 @@ jobs: $UNIT_TEST system/loggerd && \ $UNIT_TEST selfdrive/car && \ $UNIT_TEST selfdrive/locationd && \ + $UNIT_TEST system/tests && \ $UNIT_TEST system/ubloxd && \ selfdrive/locationd/test/_test_locationd_lib.py && \ ./system/ubloxd/tests/test_glonass_runner && \ diff --git a/system/logmessaged.py b/system/logmessaged.py index 280a23cf1d..8f24471941 100755 --- a/system/logmessaged.py +++ b/system/logmessaged.py @@ -12,7 +12,7 @@ def main() -> NoReturn: log_handler.setFormatter(SwagLogFileFormatter(None)) log_level = 20 # logging.INFO - ctx = zmq.Context().instance() + ctx = zmq.Context.instance() sock = ctx.socket(zmq.PULL) sock.bind("ipc:///tmp/logmessage") @@ -20,23 +20,32 @@ def main() -> NoReturn: log_message_sock = messaging.pub_sock('logMessage') error_log_message_sock = messaging.pub_sock('errorLogMessage') - while True: - dat = b''.join(sock.recv_multipart()) - level = dat[0] - record = dat[1:].decode("utf-8") - if level >= log_level: - log_handler.emit(record) + try: + while True: + dat = b''.join(sock.recv_multipart()) + level = dat[0] + record = dat[1:].decode("utf-8") + if level >= log_level: + log_handler.emit(record) - # then we publish them - msg = messaging.new_message() - msg.logMessage = record - log_message_sock.send(msg.to_bytes()) + if len(record) > 2*1024*1024: + print("WARNING: log too big to publish", len(record)) + print(print(record[:100])) + continue - if level >= 40: # logging.ERROR + # then we publish them msg = messaging.new_message() - msg.errorLogMessage = record - error_log_message_sock.send(msg.to_bytes()) - + msg.logMessage = record + log_message_sock.send(msg.to_bytes()) + + if level >= 40: # logging.ERROR + msg = messaging.new_message() + msg.errorLogMessage = record + error_log_message_sock.send(msg.to_bytes()) + finally: + sock.close() + ctx.term() + log_handler.close() if __name__ == "__main__": main() diff --git a/system/tests/__init__.py b/system/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/system/tests/test_logmessaged.py b/system/tests/test_logmessaged.py new file mode 100755 index 0000000000..08335517ae --- /dev/null +++ b/system/tests/test_logmessaged.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import glob +import os +import shutil +import time +import unittest + +import cereal.messaging as messaging +from selfdrive.manager.process_config import managed_processes +from system.swaglog import cloudlog, SWAGLOG_DIR + + +class TestLogmessaged(unittest.TestCase): + + def setUp(self): + if os.path.exists(SWAGLOG_DIR): + shutil.rmtree(SWAGLOG_DIR) + + managed_processes['logmessaged'].start() + self.sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False) + self.error_sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False) + + # ensure sockets are connected + time.sleep(0.2) + messaging.drain_sock(self.sock) + messaging.drain_sock(self.error_sock) + + def tearDown(self): + del self.sock + del self.error_sock + managed_processes['logmessaged'].stop(block=True) + + def _get_log_files(self): + return list(glob.glob(os.path.join(SWAGLOG_DIR, "swaglog.*"))) + + def test_simple_log(self): + msgs = [f"abc {i}" for i in range(10)] + for m in msgs: + cloudlog.error(m) + time.sleep(3) + m = messaging.drain_sock(self.sock) + assert len(m) == len(msgs) + assert len(self._get_log_files()) >= 1 + + def test_big_log(self): + n = 10 + msg = "a"*3*1024*1024 + for _ in range(n): + cloudlog.info(msg) + time.sleep(3) + + msgs = messaging.drain_sock(self.sock) + assert len(msgs) == 0 + + logsize = sum([os.path.getsize(f) for f in self._get_log_files()]) + assert (n*len(msg)) < logsize < (n*(len(msg)+1024)) + + +if __name__ == "__main__": + unittest.main()