CI: Add valgrind (#2245)
	
		
	
				
					
				
			* prepare a test for valgrind by fixing linter issues * something went wrong * Try update cereal * Not working, fixing replay_learning.py * Testing * Testing * Working now * More logs, a bit of cleaning, still no idea how this works * Ok got 10% cleaning done * trying some stackOverflow code * Lots of cleaning * wip * Delete valgrind_testing.py * Delete replay_learning.py * Add to CI, cannot fail though * Add to CI, cannot fail though * Now the unit test should fail * Dump valgrind output in case of a failure * fix subrepos * fix subs * hopefully fixes every issue that we have * wait for message confirmation * Revert "wait for message confirmation" This reverts commitpull/2276/headad7ba280ec. * fix local issues * Revert "fix local issues" This reverts commit4498df21df. * attempt 3 to fix things * untested function extraction * refactor * add valgrind task * add delay and BASEDIR * Make test not fail, it is not designed for that * fix subrepositories * fix tests * Real fix now :) * scons * ... stupid CI * fix valgrind CI * Update test.yaml * Update test.yaml * Update test.yaml * add valgrind logs as file * remove delays * kill process more reliably * rename action * small cleanup * fix casting to int in case of large leaks Co-authored-by: Willem Melching <willem.melching@gmail.com>
							parent
							
								
									a77548bff6
								
							
						
					
					
						commit
						a09fd7c860
					
				
				 4 changed files with 131 additions and 0 deletions
			
			
		| @ -0,0 +1,103 @@ | ||||
| import os | ||||
| import threading | ||||
| import time | ||||
| import unittest | ||||
| import subprocess | ||||
| import signal | ||||
| 
 | ||||
| if "CI" in os.environ: | ||||
|   def tqdm(x): | ||||
|     return x | ||||
| else: | ||||
|   from tqdm import tqdm   # type: ignore | ||||
| 
 | ||||
| import cereal.messaging as messaging | ||||
| from collections import namedtuple | ||||
| from tools.lib.logreader import LogReader | ||||
| from selfdrive.test.process_replay.test_processes import get_segment | ||||
| from common.basedir import BASEDIR | ||||
| 
 | ||||
| ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response']) | ||||
| 
 | ||||
| CONFIGS = [ | ||||
|   ProcessConfig( | ||||
|     proc_name="ubloxd", | ||||
|     pub_sub={ | ||||
|       "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], | ||||
|     }, | ||||
|     ignore=[], | ||||
|     command="./ubloxd", | ||||
|     path="selfdrive/locationd/", | ||||
|     segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3", | ||||
|     wait_for_response=True | ||||
|   ), | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| class TestValgrind(unittest.TestCase): | ||||
|   def extract_leak_sizes(self, log): | ||||
|     log = log.replace(",","")  # fixes casting to int issue with large leaks | ||||
|     err_lost1 = log.split("definitely lost: ")[1] | ||||
|     err_lost2 = log.split("indirectly lost: ")[1] | ||||
|     err_lost3 = log.split("possibly lost: ")[1] | ||||
|     definitely_lost = int(err_lost1.split(" ")[0]) | ||||
|     indirectly_lost = int(err_lost2.split(" ")[0]) | ||||
|     possibly_lost = int(err_lost3.split(" ")[0]) | ||||
|     return (definitely_lost, indirectly_lost, possibly_lost) | ||||
| 
 | ||||
|   def valgrindlauncher(self, arg, cwd): | ||||
|     os.chdir(os.path.join(BASEDIR, cwd)) | ||||
|     # Run valgrind on a process | ||||
|     command = "valgrind --leak-check=full " + arg | ||||
|     p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)  # pylint: disable=W1509 | ||||
|     while not self.done: | ||||
|       time.sleep(0.1) | ||||
| 
 | ||||
|     os.killpg(os.getpgid(p.pid), signal.SIGINT) | ||||
|     _, err = p.communicate() | ||||
|     error_msg = str(err, encoding='utf-8') | ||||
|     with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f: | ||||
|       f.write(error_msg) | ||||
|       f.write(5 * "\n") | ||||
|     definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg) | ||||
|     if max(definitely_lost, indirectly_lost, possibly_lost) > 0: | ||||
|       self.leak = True | ||||
|       print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost) | ||||
|     else: | ||||
|       self.leak = False | ||||
| 
 | ||||
|   def replay_process(self, config, logreader): | ||||
|     pub_sockets = [s for s in config.pub_sub.keys()]  # We dump data from logs here | ||||
|     sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub]  # We get responses here | ||||
|     pm = messaging.PubMaster(pub_sockets) | ||||
|     sm = messaging.SubMaster(sub_sockets) | ||||
| 
 | ||||
|     print("Sorting logs") | ||||
|     all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime) | ||||
|     pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())] | ||||
| 
 | ||||
|     thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path)) | ||||
|     thread.daemon = True | ||||
|     thread.start() | ||||
| 
 | ||||
|     time.sleep(5)  # We give the process time to start | ||||
|     for msg in tqdm(pub_msgs): | ||||
|       pm.send(msg.which(), msg.as_builder()) | ||||
|       if config.wait_for_response: | ||||
|         sm.update(100) | ||||
| 
 | ||||
|     self.done = True | ||||
| 
 | ||||
|   def test_config(self): | ||||
|     open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w") | ||||
| 
 | ||||
|     for cfg in CONFIGS: | ||||
|       self.done = False | ||||
|       URL = cfg.segment | ||||
|       lr = LogReader(get_segment(URL)) | ||||
|       self.replay_process(cfg, lr) | ||||
|       time.sleep(1)  # Wait for the logs to get written | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   unittest.main() | ||||
					Loading…
					
					
				
		Reference in new issue