From e3cf793f998d597a0d704c472051ffbecfdfec09 Mon Sep 17 00:00:00 2001 From: Gregor Kikelj Date: Tue, 29 Sep 2020 12:04:18 +0200 Subject: [PATCH] Testing --- selfdrive/test/replay_learning.py | 344 ++++++++++++++++++++++++++++++ 1 file changed, 344 insertions(+) create mode 100644 selfdrive/test/replay_learning.py diff --git a/selfdrive/test/replay_learning.py b/selfdrive/test/replay_learning.py new file mode 100644 index 0000000000..853642e8b3 --- /dev/null +++ b/selfdrive/test/replay_learning.py @@ -0,0 +1,344 @@ +import capnp +import os +import sys +import threading + +if "CI" in os.environ: + def tqdm(x): + return x +else: + from tqdm import tqdm # type: ignore + +# from cereal import car +from selfdrive.car.car_helpers import get_car +# import selfdrive.manager as manager +import cereal.messaging as messaging +from common.params import Params +# from cereal.services import service_list +from collections import namedtuple +from tools.lib.logreader import LogReader + +TEST_ROUTE = "a74b011b32b51b56|2020-09-21--10-29-15" + +# Numpy gives different results based on CPU features after version 19 +NUMPY_TOLERANCE = 1e-7 + +ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance']) + + +def wait_for_event(evt): + if not evt.wait(20): + if threading.currentThread().getName() == "MainThread": + # tested process likely died. don't let test just hang + raise Exception("Timeout reached. Tested process likely crashed.") + else: + # done testing this process, let it die + sys.exit(0) + +class FakeSocket: + def __init__(self, wait=True): + self.data = [] + self.wait = wait + self.recv_called = threading.Event() + self.recv_ready = threading.Event() + + def receive(self, non_blocking=False): + if non_blocking: + return None + + if self.wait: + self.recv_called.set() + wait_for_event(self.recv_ready) + self.recv_ready.clear() + return self.data.pop() + + def send(self, data): + if self.wait: + wait_for_event(self.recv_called) + self.recv_called.clear() + + self.data.append(data) + + if self.wait: + self.recv_ready.set() + + def wait_for_recv(self): + wait_for_event(self.recv_called) + + +class DumbSocket: + def __init__(self, s=None): + if s is not None: + try: + dat = messaging.new_message(s) + except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member + # lists + dat = messaging.new_message(s, 0) + self.data = dat.to_bytes() + + def receive(self, non_blocking=False): + return self.data + + def send(self, dat): + pass + + +class FakeSubMaster(messaging.SubMaster): + def __init__(self, services): + super(FakeSubMaster, self).__init__(services, addr=None) + self.sock = {s: DumbSocket(s) for s in services} + self.update_called = threading.Event() + self.update_ready = threading.Event() + + self.wait_on_getitem = False + + def __getitem__(self, s): + # hack to know when fingerprinting is done + if self.wait_on_getitem: + self.update_called.set() + wait_for_event(self.update_ready) + self.update_ready.clear() + return self.data[s] + + def update(self, timeout=-1): + self.update_called.set() + wait_for_event(self.update_ready) + self.update_ready.clear() + + def update_msgs(self, cur_time, msgs): + wait_for_event(self.update_called) + self.update_called.clear() + super(FakeSubMaster, self).update_msgs(cur_time, msgs) + self.update_ready.set() + + def wait_for_update(self): + wait_for_event(self.update_called) + + +class FakePubMaster(messaging.PubMaster): + def __init__(self, services): # pylint: disable=super-init-not-called + self.sock = {} + for s in services: + self.sock[s] = messaging.pub_sock(s) + + def send(self, s, dat): + self.sock[s].send(dat.to_bytes()) + + +def fingerprint(msgs, fsm, can_sock): + print("start fingerprinting") + fsm.wait_on_getitem = True + + # populate fake socket with data for fingerprinting + canmsgs = [msg for msg in msgs if msg.which() == "can"] + wait_for_event(can_sock.recv_called) + can_sock.recv_called.clear() + can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]] + can_sock.recv_ready.set() + can_sock.wait = False + + # we know fingerprinting is done when controlsd sets sm['pathPlan'].sensorValid + wait_for_event(fsm.update_called) + fsm.update_called.clear() + + fsm.wait_on_getitem = False + can_sock.wait = True + can_sock.data = [] + + fsm.update_ready.set() + print("finished fingerprinting") + +def get_car_params(msgs, fsm, can_sock): + can = FakeSocket(wait=False) + sendcan = FakeSocket(wait=False) + + canmsgs = [msg for msg in msgs if msg.which() == 'can'] + for m in canmsgs[:300]: + can.send(m.as_builder().to_bytes()) + _, CP = get_car(can, sendcan) + Params().put("CarParams", CP.to_bytes()) + +def radar_rcv_callback(msg, CP, cfg, fsm): + if msg.which() != "can": + return [], False + elif CP.radarOffCan: + return ["radarState", "liveTracks"], True + + radar_msgs = {"honda": [0x445], "toyota": [0x19f, 0x22f], "gm": [0x474], + "chrysler": [0x2d4]}.get(CP.carName, None) + + if radar_msgs is None: + raise NotImplementedError + + for m in msg.can: + if m.src == 1 and m.address in radar_msgs: + return ["radarState", "liveTracks"], True + return [], False + +def calibration_rcv_callback(msg, CP, cfg, fsm): + # calibrationd publishes 1 calibrationData every 5 cameraOdometry packets. + # should_recv always true to increment frame + recv_socks = [] + frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster + if frame == 0 or (msg.which() == 'cameraOdometry' and (frame % 5) == 0): + recv_socks = ["liveCalibration"] + return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry' + +CONFIGS = [ + ProcessConfig( + proc_name="controlsd", + pub_sub={ + "can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], + "thermal": [], "health": [], "liveCalibration": [], "dMonitoringState": [], "plan": [], "pathPlan": [], "gpsLocation": [], "liveLocationKalman": [], + "model": [], "frame": [], + }, + ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"], + init_callback=fingerprint, + should_recv_callback=None, + tolerance=None, + ), + ProcessConfig( + proc_name="radard", + pub_sub={ + "can": ["radarState", "liveTracks"], + "liveParameters": [], "controlsState": [], "model": [], + }, + ignore=["logMonoTime", "valid", "radarState.cumLagMs"], + init_callback=get_car_params, + should_recv_callback=radar_rcv_callback, + tolerance=None, + ), + ProcessConfig( + proc_name="plannerd", + pub_sub={ + "model": ["pathPlan"], "radarState": ["plan"], + "carState": [], "controlsState": [], "liveParameters": [], + }, + ignore=["logMonoTime", "valid", "plan.processingDelay"], + init_callback=get_car_params, + should_recv_callback=None, + tolerance=None, + ), + ProcessConfig( + proc_name="calibrationd", + pub_sub={ + "carState": ["liveCalibration"], + "cameraOdometry": [] + }, + ignore=["logMonoTime", "valid"], + init_callback=get_car_params, + should_recv_callback=calibration_rcv_callback, + tolerance=None, + ), + ProcessConfig( + proc_name="dmonitoringd", + pub_sub={ + "driverState": ["dMonitoringState"], + "liveCalibration": [], "carState": [], "model": [], "gpsLocation": [], + }, + ignore=["logMonoTime", "valid"], + init_callback=get_car_params, + should_recv_callback=None, + tolerance=NUMPY_TOLERANCE, + ), + ProcessConfig( + proc_name="locationd", + pub_sub={ + "cameraOdometry": ["liveLocationKalman"], + "sensorEvents": [], "gpsLocationExternal": [], "liveCalibration": [], "carState": [], + }, + ignore=["logMonoTime", "valid"], + init_callback=get_car_params, + should_recv_callback=None, + tolerance=NUMPY_TOLERANCE, + ), + ProcessConfig( + proc_name="paramsd", + pub_sub={ + "liveLocationKalman": ["liveParameters"], + "carState": [] + }, + ignore=["logMonoTime", "valid"], + init_callback=get_car_params, + should_recv_callback=None, + tolerance=NUMPY_TOLERANCE, + ), + ProcessConfig( + proc_name="ubloxd", + pub_sub={ + "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], + }, + ignore=[], + init_callback=get_car_params, + should_recv_callback=None, + tolerance=NUMPY_TOLERANCE, + ), +] +def valgrindlauncher(arg, cwd): + os.chdir(cwd) + + # Run valgrind on a process + command = "valgrind " + arg + print(command) + output = os.popen(command) + while True: + s = output.read() + if s == "": + break + print(s) + +def replay_process(cfg, lr): + # sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] + pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] + + fsm = FakeSubMaster(pub_sockets) + # fpm = FakePubMaster(sub_sockets) + + all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) + print("DUMPING1") + for x in all_msgs: + print(x.which()) + pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] + print("DUMPING2") + for x in pub_msgs: + print(x) + params = Params() + params.clear_all() + params.manager_start() + params.put("OpenpilotEnabledToggle", "1") + params.put("Passive", "0") + params.put("CommunityFeaturesToggle", "1") + + os.environ['NO_RADAR_SLEEP'] = "1" + + thread = threading.Thread(target=valgrindlauncher, args=("./ubloxd ", "../locationd")) + thread.daemon = True + thread.start() + + if cfg.init_callback is not None: + if 'can' not in list(cfg.pub_sub.keys()): + can_sock = None + cfg.init_callback(all_msgs, fsm, can_sock) + print("Got to here1") + # CP = car.CarParams.from_bytes(params.get("CarParams", block=True)) + print("Got to here2") + + log_msgs, msg_queue = [], [] + for msg in tqdm(pub_msgs): + print("MSG incoming(not a chemical, a message)") + print(msg) + + if msg.which() == 'can': + can_sock.send(msg.as_builder().to_bytes()) + else: + msg_queue.append(msg.as_builder()) + + return log_msgs + +if __name__ == "__main__": + cfg = CONFIGS[-1] + lr = LogReader("https://commadataci.blob.core.windows.net/openpilotci/76b83eb0245de90e/2020-03-05--19-16-05/3/rlog.bz2") + print(str(cfg)) + response = replay_process(cfg, lr) + for x in response: + print(x)