openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

55 lines
1.5 KiB

import glob
import os
import time
import cereal.messaging as messaging
from openpilot.system.manager.process_config import managed_processes
from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog, ipchandler
class TestLogmessaged:
def setup_method(self):
# clear the IPC buffer in case some other tests used cloudlog and filled it
ipchandler.close()
ipchandler.connect()
managed_processes['logmessaged'].start()
self.sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
self.error_sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
# ensure sockets are connected
time.sleep(1)
messaging.drain_sock(self.sock)
messaging.drain_sock(self.error_sock)
def teardown_method(self):
del self.sock
del self.error_sock
managed_processes['logmessaged'].stop(block=True)
def _get_log_files(self):
return list(glob.glob(os.path.join(Paths.swaglog_root(), "swaglog.*")))
def test_simple_log(self):
msgs = [f"abc {i}" for i in range(10)]
for m in msgs:
cloudlog.error(m)
time.sleep(1)
m = messaging.drain_sock(self.sock)
assert len(m) == len(msgs)
assert len(self._get_log_files()) >= 1
def test_big_log(self):
n = 10
msg = "a"*3*1024*1024
for _ in range(n):
cloudlog.info(msg)
time.sleep(1)
msgs = messaging.drain_sock(self.sock)
assert len(msgs) == 0
logsize = sum([os.path.getsize(f) for f in self._get_log_files()])
assert (n*len(msg)) < logsize < (n*(len(msg)+1024))