From ad7ba280ec6536015ac1a442c9732bfe42fd5664 Mon Sep 17 00:00:00 2001 From: Gregor Kikelj Date: Thu, 1 Oct 2020 12:59:28 +0200 Subject: [PATCH] wait for message confirmation --- opendbc | 2 +- panda | 2 +- selfdrive/test/test_valgrind_replay.py | 64 +++++++++++--------------- 3 files changed, 29 insertions(+), 39 deletions(-) diff --git a/opendbc b/opendbc index e4c1664e5a..209d968b75 160000 --- a/opendbc +++ b/opendbc @@ -1 +1 @@ -Subproject commit e4c1664e5a87f4ff012e7038e44759f37e21adb0 +Subproject commit 209d968b750ac9aa8ee91d48b6ad639e151b98af diff --git a/panda b/panda index 8b41ed3b81..09997428f3 160000 --- a/panda +++ b/panda @@ -1 +1 @@ -Subproject commit 8b41ed3b81b0315a5f9c9aa0cad84bfaef91f3f4 +Subproject commit 09997428f3417c6eb137dffe1b7639455401efaa diff --git a/selfdrive/test/test_valgrind_replay.py b/selfdrive/test/test_valgrind_replay.py index d838fbfa82..4611c3e621 100644 --- a/selfdrive/test/test_valgrind_replay.py +++ b/selfdrive/test/test_valgrind_replay.py @@ -13,10 +13,8 @@ else: import cereal.messaging as messaging from collections import namedtuple from tools.lib.logreader import LogReader - -ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path']) - -BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" +from selfdrive.test.process_replay.test_processes import get_segment +ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response']) CONFIGS = [ ProcessConfig( @@ -25,28 +23,20 @@ CONFIGS = [ "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], }, ignore=[], - command="./ubloxd & sleep 20; kill $!", + command="./ubloxd", path="../locationd", + segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3", + wait_for_response=True ), ] -class SimplePubMaster(): - def __init__(self, services): # pylint: disable=super-init-not-called - self.sock = {} - for s in services: - self.sock[s] = messaging.pub_sock(s) - - def send(self, s, dat): - # print(dat) - self.sock[s].send(dat.to_bytes()) - class TestValgrind(unittest.TestCase): def valgrindlauncher(self, arg, cwd): os.chdir(cwd) # Run valgrind on a process - command = "valgrind --leak-check=full " + arg + command = "valgrind --leak-check=full " + arg + " & sleep 20; kill $!" p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) _, err = p.communicate() @@ -59,45 +49,45 @@ class TestValgrind(unittest.TestCase): possibly_lost_amount = int(err_lost3.split(" ")[0]) if max(definitely_lost_amount, indirectly_lost_amount, possibly_lost_amount) > 0: self.leak = True - print(err) + print(definitely_lost_amount, indirectly_lost_amount, possibly_lost_amount) + # print(err) return self.leak = False - def replay_process(self, cfg, lr): - pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] # We dump data from logs here + def replay_process(self, config, logreader): + pub_sockets = [s for s in config.pub_sub.keys()] # We dump data from logs here + sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here + pm = messaging.PubMaster(pub_sockets) + sm = messaging.SubMaster(sub_sockets) - pm = SimplePubMaster(pub_sockets) print("Sorting logs") - all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) - pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] + all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime) + pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())] - thread = threading.Thread(target=self.valgrindlauncher, args=(cfg.command, cfg.path)) + thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path)) thread.daemon = True thread.start() + time.sleep(5) # We give the process time to start for msg in tqdm(pub_msgs): pm.send(msg.which(), msg.as_builder()) - - def get_segment(self, segment_name, original=True): - route_name, segment_num = segment_name.rsplit("--", 1) - if original: - rlog_url = BASE_URL + "%s/%s/rlog.bz2" % (route_name.replace("|", "/"), segment_num) - else: - process_replay_dir = os.path.dirname(os.path.abspath(__file__)) - model_ref_commit = open(os.path.join(process_replay_dir, "model_ref_commit")).read().strip() - rlog_url = BASE_URL + "%s/%s/rlog_%s.bz2" % (route_name.replace("|", "/"), segment_num, model_ref_commit) - - return rlog_url + if config.wait_for_response: + gotResponse = False + while not gotResponse: + sm.update() + for s in sub_sockets: + if sm.updated[s]: + gotResponse = True def test_config_0(self): cfg = CONFIGS[0] - URL = self.get_segment("0375fdf7b1ce594d|2019-06-13--08-32-25--3") - lr = LogReader(URL) + URL = cfg.segment + lr = LogReader(get_segment(URL)) self.replay_process(cfg, lr) # Wait for the replay to complete time.sleep(30) - assert not self.leak + self.assertFalse(self.leak) if __name__ == "__main__": unittest.main()