openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

117 lines
3.7 KiB

#!/usr/bin/env python3
import os
import threading
import time
import unittest
import subprocess
import signal
if "CI" in os.environ:
def tqdm(x):
return x
else:
from tqdm import tqdm # type: ignore
import cereal.messaging as messaging
from collections import namedtuple
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.common.basedir import BASEDIR
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response'])
CONFIGS = [
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
ignore=[],
command="./ubloxd",
path="system/ubloxd",
segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3",
wait_for_response=True
),
]
class TestValgrind(unittest.TestCase):
def extract_leak_sizes(self, log):
if "All heap blocks were freed -- no leaks are possible" in log:
return (0,0,0)
log = log.replace(",","") # fixes casting to int issue with large leaks
err_lost1 = log.split("definitely lost: ")[1]
err_lost2 = log.split("indirectly lost: ")[1]
err_lost3 = log.split("possibly lost: ")[1]
definitely_lost = int(err_lost1.split(" ")[0])
indirectly_lost = int(err_lost2.split(" ")[0])
possibly_lost = int(err_lost3.split(" ")[0])
return (definitely_lost, indirectly_lost, possibly_lost)
def valgrindlauncher(self, arg, cwd):
os.chdir(os.path.join(BASEDIR, cwd))
# Run valgrind on a process
command = "valgrind --leak-check=full " + arg
p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
while not self.replay_done:
time.sleep(0.1)
# Kill valgrind and extract leak output
os.killpg(os.getpgid(p.pid), signal.SIGINT)
_, err = p.communicate()
error_msg = str(err, encoding='utf-8')
with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f:
f.write(error_msg)
f.write(5 * "\n")
definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg)
if max(definitely_lost, indirectly_lost, possibly_lost) > 0:
self.leak = True
print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost)
else:
self.leak = False
def replay_process(self, config, logreader):
pub_sockets = list(config.pub_sub.keys()) # We dump data from logs here
sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here
pm = messaging.PubMaster(pub_sockets)
sm = messaging.SubMaster(sub_sockets)
print("Sorting logs")
all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())]
thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path))
thread.daemon = True
thread.start()
while not all(pm.all_readers_updated(s) for s in config.pub_sub.keys()):
time.sleep(0)
for msg in tqdm(pub_msgs):
pm.send(msg.which(), msg.as_builder())
if config.wait_for_response:
sm.update(100)
self.replay_done = True
def test_config(self):
open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w").close()
for cfg in CONFIGS:
self.leak = None
self.replay_done = False
r, n = cfg.segment.rsplit("--", 1)
lr = LogReader(get_url(r, n))
self.replay_process(cfg, lr)
while self.leak is None:
time.sleep(0.1) # Wait for the valgrind to finish
self.assertFalse(self.leak)
if __name__ == "__main__":
unittest.main()