import os import threading import time import unittest import subprocess if "CI" in os.environ: def tqdm(x): return x else: from tqdm import tqdm # type: ignore import cereal.messaging as messaging from collections import namedtuple from tools.lib.logreader import LogReader from selfdrive.test.process_replay.test_processes import get_segment ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response']) CONFIGS = [ ProcessConfig( proc_name="ubloxd", pub_sub={ "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], }, ignore=[], command="./ubloxd", path="../locationd", segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3", wait_for_response=True ), ] class TestValgrind(unittest.TestCase): def valgrindlauncher(self, arg, cwd): os.chdir(cwd) # Run valgrind on a process command = "valgrind --leak-check=full " + arg + " & sleep 20; kill $!" p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) _, err = p.communicate() error_msg = str(err, encoding='utf-8') err_lost1 = error_msg.split("definitely lost: ")[1] err_lost2 = error_msg.split("indirectly lost: ")[1] err_lost3 = error_msg.split("possibly lost: ")[1] definitely_lost_amount = int(err_lost1.split(" ")[0]) indirectly_lost_amount = int(err_lost2.split(" ")[0]) possibly_lost_amount = int(err_lost3.split(" ")[0]) if max(definitely_lost_amount, indirectly_lost_amount, possibly_lost_amount) > 0: self.leak = True print(definitely_lost_amount, indirectly_lost_amount, possibly_lost_amount) # print(err) return self.leak = False def replay_process(self, config, logreader): pub_sockets = [s for s in config.pub_sub.keys()] # We dump data from logs here sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here pm = messaging.PubMaster(pub_sockets) sm = messaging.SubMaster(sub_sockets) print("Sorting logs") all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())] thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path)) thread.daemon = True thread.start() time.sleep(5) # We give the process time to start for msg in tqdm(pub_msgs): pm.send(msg.which(), msg.as_builder()) if config.wait_for_response: gotResponse = False while not gotResponse: sm.update() for s in sub_sockets: if sm.updated[s]: gotResponse = True def test_config_0(self): cfg = CONFIGS[0] URL = cfg.segment lr = LogReader(get_segment(URL)) self.replay_process(cfg, lr) # Wait for the replay to complete time.sleep(30) self.assertFalse(self.leak) if __name__ == "__main__": unittest.main()