Working now

pull/2217/head
Gregor Kikelj 5 years ago
parent e3cf793f99
commit 723cdded47
  1. 237
      selfdrive/test/replay_learning.py

@ -1,7 +1,7 @@
import capnp
import os
import sys
import threading
import time
if "CI" in os.environ:
def tqdm(x):
@ -9,21 +9,13 @@ if "CI" in os.environ:
else:
from tqdm import tqdm # type: ignore
# from cereal import car
from selfdrive.car.car_helpers import get_car
# import selfdrive.manager as manager
import cereal.messaging as messaging
from common.params import Params
# from cereal.services import service_list
from collections import namedtuple
from tools.lib.logreader import LogReader
TEST_ROUTE = "a74b011b32b51b56|2020-09-21--10-29-15"
# Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance'])
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore',])
def wait_for_event(evt):
@ -35,93 +27,14 @@ def wait_for_event(evt):
# done testing this process, let it die
sys.exit(0)
class FakeSocket:
def __init__(self, wait=True):
self.data = []
self.wait = wait
self.recv_called = threading.Event()
self.recv_ready = threading.Event()
def receive(self, non_blocking=False):
if non_blocking:
return None
if self.wait:
self.recv_called.set()
wait_for_event(self.recv_ready)
self.recv_ready.clear()
return self.data.pop()
def send(self, data):
if self.wait:
wait_for_event(self.recv_called)
self.recv_called.clear()
self.data.append(data)
if self.wait:
self.recv_ready.set()
def wait_for_recv(self):
wait_for_event(self.recv_called)
class DumbSocket:
def __init__(self, s=None):
if s is not None:
try:
dat = messaging.new_message(s)
except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member
# lists
dat = messaging.new_message(s, 0)
self.data = dat.to_bytes()
def receive(self, non_blocking=False):
return self.data
def send(self, dat):
pass
class FakeSubMaster(messaging.SubMaster):
def __init__(self, services):
super(FakeSubMaster, self).__init__(services, addr=None)
self.sock = {s: DumbSocket(s) for s in services}
self.update_called = threading.Event()
self.update_ready = threading.Event()
self.wait_on_getitem = False
def __getitem__(self, s):
# hack to know when fingerprinting is done
if self.wait_on_getitem:
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
return self.data[s]
def update(self, timeout=-1):
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
def update_msgs(self, cur_time, msgs):
wait_for_event(self.update_called)
self.update_called.clear()
super(FakeSubMaster, self).update_msgs(cur_time, msgs)
self.update_ready.set()
def wait_for_update(self):
wait_for_event(self.update_called)
class FakePubMaster(messaging.PubMaster):
class SimplePubMaster(messaging.PubMaster):
def __init__(self, services): # pylint: disable=super-init-not-called
self.sock = {}
for s in services:
self.sock[s] = messaging.pub_sock(s)
def send(self, s, dat):
print(dat)
self.sock[s].send(dat.to_bytes())
@ -148,16 +61,6 @@ def fingerprint(msgs, fsm, can_sock):
fsm.update_ready.set()
print("finished fingerprinting")
def get_car_params(msgs, fsm, can_sock):
can = FakeSocket(wait=False)
sendcan = FakeSocket(wait=False)
canmsgs = [msg for msg in msgs if msg.which() == 'can']
for m in canmsgs[:300]:
can.send(m.as_builder().to_bytes())
_, CP = get_car(can, sendcan)
Params().put("CarParams", CP.to_bytes())
def radar_rcv_callback(msg, CP, cfg, fsm):
if msg.which() != "can":
return [], False
@ -185,100 +88,20 @@ def calibration_rcv_callback(msg, CP, cfg, fsm):
return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry'
CONFIGS = [
ProcessConfig(
proc_name="controlsd",
pub_sub={
"can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
"thermal": [], "health": [], "liveCalibration": [], "dMonitoringState": [], "plan": [], "pathPlan": [], "gpsLocation": [], "liveLocationKalman": [],
"model": [], "frame": [],
},
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
init_callback=fingerprint,
should_recv_callback=None,
tolerance=None,
),
ProcessConfig(
proc_name="radard",
pub_sub={
"can": ["radarState", "liveTracks"],
"liveParameters": [], "controlsState": [], "model": [],
},
ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
init_callback=get_car_params,
should_recv_callback=radar_rcv_callback,
tolerance=None,
),
ProcessConfig(
proc_name="plannerd",
pub_sub={
"model": ["pathPlan"], "radarState": ["plan"],
"carState": [], "controlsState": [], "liveParameters": [],
},
ignore=["logMonoTime", "valid", "plan.processingDelay"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=None,
),
ProcessConfig(
proc_name="calibrationd",
pub_sub={
"carState": ["liveCalibration"],
"cameraOdometry": []
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=calibration_rcv_callback,
tolerance=None,
),
ProcessConfig(
proc_name="dmonitoringd",
pub_sub={
"driverState": ["dMonitoringState"],
"liveCalibration": [], "carState": [], "model": [], "gpsLocation": [],
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
),
ProcessConfig(
proc_name="locationd",
pub_sub={
"cameraOdometry": ["liveLocationKalman"],
"sensorEvents": [], "gpsLocationExternal": [], "liveCalibration": [], "carState": [],
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
),
ProcessConfig(
proc_name="paramsd",
pub_sub={
"liveLocationKalman": ["liveParameters"],
"carState": []
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
),
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
ignore=[],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
),
]
def valgrindlauncher(arg, cwd):
os.chdir(cwd)
# Run valgrind on a process
command = "valgrind " + arg
command = "valgrind --leak-check=full " + arg
print(command)
output = os.popen(command)
while True:
@ -288,20 +111,13 @@ def valgrindlauncher(arg, cwd):
print(s)
def replay_process(cfg, lr):
# sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can']
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] # We dump data from logs here
fsm = FakeSubMaster(pub_sockets)
# fpm = FakePubMaster(sub_sockets)
fpm = SimplePubMaster(pub_sockets)
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
print("DUMPING1")
for x in all_msgs:
print(x.which())
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
print("DUMPING2")
for x in pub_msgs:
print(x)
params = Params()
params.clear_all()
params.manager_start()
@ -311,34 +127,23 @@ def replay_process(cfg, lr):
os.environ['NO_RADAR_SLEEP'] = "1"
thread = threading.Thread(target=valgrindlauncher, args=("./ubloxd ", "../locationd"))
thread = threading.Thread(target=valgrindlauncher, args=("./ubloxd & sleep 10; kill $!", "../locationd"))
thread.daemon = True
thread.start()
if cfg.init_callback is not None:
if 'can' not in list(cfg.pub_sub.keys()):
can_sock = None
cfg.init_callback(all_msgs, fsm, can_sock)
print("Got to here1")
# CP = car.CarParams.from_bytes(params.get("CarParams", block=True))
print("Got to here2")
log_msgs, msg_queue = [], []
for msg in tqdm(pub_msgs):
print("MSG incoming(not a chemical, a message)")
print(msg)
if msg.which() == 'can':
can_sock.send(msg.as_builder().to_bytes())
else:
msg_queue.append(msg.as_builder())
return log_msgs
# print(msg)
fpm.send(msg.which(), msg.as_builder())
URL = "https://commadata2.blob.core.windows.net/commadata2/a74b011b32b51b56/\
2020-09-21--10-29-15/0/rlog.bz2?se=2020-09-29T12%3A08%3A10Z&sp=r&sv=2018-03-2\
8&sr=b&rscd=attachment%3B%20filename%3Da74b011b32b51b56_2020-09-21--10-29-15--0--rlog.bz2\
&sig=0KVC2BWVlvW4yXTPpUm9Sim8Pp0HFKA5rNyTpnvHv8o%3D"
if __name__ == "__main__":
cfg = CONFIGS[-1]
lr = LogReader("https://commadataci.blob.core.windows.net/openpilotci/76b83eb0245de90e/2020-03-05--19-16-05/3/rlog.bz2")
cfg = CONFIGS[0]
lr = LogReader(URL)
print(str(cfg))
response = replay_process(cfg, lr)
for x in response:
print(x)
replay_process(cfg, lr)
time.sleep(15)

Loading…
Cancel
Save