parent
a445232d8c
commit
e3cf793f99
1 changed files with 344 additions and 0 deletions
@ -0,0 +1,344 @@ |
||||
import capnp |
||||
import os |
||||
import sys |
||||
import threading |
||||
|
||||
if "CI" in os.environ: |
||||
def tqdm(x): |
||||
return x |
||||
else: |
||||
from tqdm import tqdm # type: ignore |
||||
|
||||
# from cereal import car |
||||
from selfdrive.car.car_helpers import get_car |
||||
# import selfdrive.manager as manager |
||||
import cereal.messaging as messaging |
||||
from common.params import Params |
||||
# from cereal.services import service_list |
||||
from collections import namedtuple |
||||
from tools.lib.logreader import LogReader |
||||
|
||||
TEST_ROUTE = "a74b011b32b51b56|2020-09-21--10-29-15" |
||||
|
||||
# Numpy gives different results based on CPU features after version 19 |
||||
NUMPY_TOLERANCE = 1e-7 |
||||
|
||||
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance']) |
||||
|
||||
|
||||
def wait_for_event(evt): |
||||
if not evt.wait(20): |
||||
if threading.currentThread().getName() == "MainThread": |
||||
# tested process likely died. don't let test just hang |
||||
raise Exception("Timeout reached. Tested process likely crashed.") |
||||
else: |
||||
# done testing this process, let it die |
||||
sys.exit(0) |
||||
|
||||
class FakeSocket: |
||||
def __init__(self, wait=True): |
||||
self.data = [] |
||||
self.wait = wait |
||||
self.recv_called = threading.Event() |
||||
self.recv_ready = threading.Event() |
||||
|
||||
def receive(self, non_blocking=False): |
||||
if non_blocking: |
||||
return None |
||||
|
||||
if self.wait: |
||||
self.recv_called.set() |
||||
wait_for_event(self.recv_ready) |
||||
self.recv_ready.clear() |
||||
return self.data.pop() |
||||
|
||||
def send(self, data): |
||||
if self.wait: |
||||
wait_for_event(self.recv_called) |
||||
self.recv_called.clear() |
||||
|
||||
self.data.append(data) |
||||
|
||||
if self.wait: |
||||
self.recv_ready.set() |
||||
|
||||
def wait_for_recv(self): |
||||
wait_for_event(self.recv_called) |
||||
|
||||
|
||||
class DumbSocket: |
||||
def __init__(self, s=None): |
||||
if s is not None: |
||||
try: |
||||
dat = messaging.new_message(s) |
||||
except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member |
||||
# lists |
||||
dat = messaging.new_message(s, 0) |
||||
self.data = dat.to_bytes() |
||||
|
||||
def receive(self, non_blocking=False): |
||||
return self.data |
||||
|
||||
def send(self, dat): |
||||
pass |
||||
|
||||
|
||||
class FakeSubMaster(messaging.SubMaster): |
||||
def __init__(self, services): |
||||
super(FakeSubMaster, self).__init__(services, addr=None) |
||||
self.sock = {s: DumbSocket(s) for s in services} |
||||
self.update_called = threading.Event() |
||||
self.update_ready = threading.Event() |
||||
|
||||
self.wait_on_getitem = False |
||||
|
||||
def __getitem__(self, s): |
||||
# hack to know when fingerprinting is done |
||||
if self.wait_on_getitem: |
||||
self.update_called.set() |
||||
wait_for_event(self.update_ready) |
||||
self.update_ready.clear() |
||||
return self.data[s] |
||||
|
||||
def update(self, timeout=-1): |
||||
self.update_called.set() |
||||
wait_for_event(self.update_ready) |
||||
self.update_ready.clear() |
||||
|
||||
def update_msgs(self, cur_time, msgs): |
||||
wait_for_event(self.update_called) |
||||
self.update_called.clear() |
||||
super(FakeSubMaster, self).update_msgs(cur_time, msgs) |
||||
self.update_ready.set() |
||||
|
||||
def wait_for_update(self): |
||||
wait_for_event(self.update_called) |
||||
|
||||
|
||||
class FakePubMaster(messaging.PubMaster): |
||||
def __init__(self, services): # pylint: disable=super-init-not-called |
||||
self.sock = {} |
||||
for s in services: |
||||
self.sock[s] = messaging.pub_sock(s) |
||||
|
||||
def send(self, s, dat): |
||||
self.sock[s].send(dat.to_bytes()) |
||||
|
||||
|
||||
def fingerprint(msgs, fsm, can_sock): |
||||
print("start fingerprinting") |
||||
fsm.wait_on_getitem = True |
||||
|
||||
# populate fake socket with data for fingerprinting |
||||
canmsgs = [msg for msg in msgs if msg.which() == "can"] |
||||
wait_for_event(can_sock.recv_called) |
||||
can_sock.recv_called.clear() |
||||
can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]] |
||||
can_sock.recv_ready.set() |
||||
can_sock.wait = False |
||||
|
||||
# we know fingerprinting is done when controlsd sets sm['pathPlan'].sensorValid |
||||
wait_for_event(fsm.update_called) |
||||
fsm.update_called.clear() |
||||
|
||||
fsm.wait_on_getitem = False |
||||
can_sock.wait = True |
||||
can_sock.data = [] |
||||
|
||||
fsm.update_ready.set() |
||||
print("finished fingerprinting") |
||||
|
||||
def get_car_params(msgs, fsm, can_sock): |
||||
can = FakeSocket(wait=False) |
||||
sendcan = FakeSocket(wait=False) |
||||
|
||||
canmsgs = [msg for msg in msgs if msg.which() == 'can'] |
||||
for m in canmsgs[:300]: |
||||
can.send(m.as_builder().to_bytes()) |
||||
_, CP = get_car(can, sendcan) |
||||
Params().put("CarParams", CP.to_bytes()) |
||||
|
||||
def radar_rcv_callback(msg, CP, cfg, fsm): |
||||
if msg.which() != "can": |
||||
return [], False |
||||
elif CP.radarOffCan: |
||||
return ["radarState", "liveTracks"], True |
||||
|
||||
radar_msgs = {"honda": [0x445], "toyota": [0x19f, 0x22f], "gm": [0x474], |
||||
"chrysler": [0x2d4]}.get(CP.carName, None) |
||||
|
||||
if radar_msgs is None: |
||||
raise NotImplementedError |
||||
|
||||
for m in msg.can: |
||||
if m.src == 1 and m.address in radar_msgs: |
||||
return ["radarState", "liveTracks"], True |
||||
return [], False |
||||
|
||||
def calibration_rcv_callback(msg, CP, cfg, fsm): |
||||
# calibrationd publishes 1 calibrationData every 5 cameraOdometry packets. |
||||
# should_recv always true to increment frame |
||||
recv_socks = [] |
||||
frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster |
||||
if frame == 0 or (msg.which() == 'cameraOdometry' and (frame % 5) == 0): |
||||
recv_socks = ["liveCalibration"] |
||||
return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry' |
||||
|
||||
CONFIGS = [ |
||||
ProcessConfig( |
||||
proc_name="controlsd", |
||||
pub_sub={ |
||||
"can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], |
||||
"thermal": [], "health": [], "liveCalibration": [], "dMonitoringState": [], "plan": [], "pathPlan": [], "gpsLocation": [], "liveLocationKalman": [], |
||||
"model": [], "frame": [], |
||||
}, |
||||
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"], |
||||
init_callback=fingerprint, |
||||
should_recv_callback=None, |
||||
tolerance=None, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="radard", |
||||
pub_sub={ |
||||
"can": ["radarState", "liveTracks"], |
||||
"liveParameters": [], "controlsState": [], "model": [], |
||||
}, |
||||
ignore=["logMonoTime", "valid", "radarState.cumLagMs"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=radar_rcv_callback, |
||||
tolerance=None, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="plannerd", |
||||
pub_sub={ |
||||
"model": ["pathPlan"], "radarState": ["plan"], |
||||
"carState": [], "controlsState": [], "liveParameters": [], |
||||
}, |
||||
ignore=["logMonoTime", "valid", "plan.processingDelay"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=None, |
||||
tolerance=None, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="calibrationd", |
||||
pub_sub={ |
||||
"carState": ["liveCalibration"], |
||||
"cameraOdometry": [] |
||||
}, |
||||
ignore=["logMonoTime", "valid"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=calibration_rcv_callback, |
||||
tolerance=None, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="dmonitoringd", |
||||
pub_sub={ |
||||
"driverState": ["dMonitoringState"], |
||||
"liveCalibration": [], "carState": [], "model": [], "gpsLocation": [], |
||||
}, |
||||
ignore=["logMonoTime", "valid"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=None, |
||||
tolerance=NUMPY_TOLERANCE, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="locationd", |
||||
pub_sub={ |
||||
"cameraOdometry": ["liveLocationKalman"], |
||||
"sensorEvents": [], "gpsLocationExternal": [], "liveCalibration": [], "carState": [], |
||||
}, |
||||
ignore=["logMonoTime", "valid"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=None, |
||||
tolerance=NUMPY_TOLERANCE, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="paramsd", |
||||
pub_sub={ |
||||
"liveLocationKalman": ["liveParameters"], |
||||
"carState": [] |
||||
}, |
||||
ignore=["logMonoTime", "valid"], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=None, |
||||
tolerance=NUMPY_TOLERANCE, |
||||
), |
||||
ProcessConfig( |
||||
proc_name="ubloxd", |
||||
pub_sub={ |
||||
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], |
||||
}, |
||||
ignore=[], |
||||
init_callback=get_car_params, |
||||
should_recv_callback=None, |
||||
tolerance=NUMPY_TOLERANCE, |
||||
), |
||||
] |
||||
def valgrindlauncher(arg, cwd): |
||||
os.chdir(cwd) |
||||
|
||||
# Run valgrind on a process |
||||
command = "valgrind " + arg |
||||
print(command) |
||||
output = os.popen(command) |
||||
while True: |
||||
s = output.read() |
||||
if s == "": |
||||
break |
||||
print(s) |
||||
|
||||
def replay_process(cfg, lr): |
||||
# sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] |
||||
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] |
||||
|
||||
fsm = FakeSubMaster(pub_sockets) |
||||
# fpm = FakePubMaster(sub_sockets) |
||||
|
||||
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) |
||||
print("DUMPING1") |
||||
for x in all_msgs: |
||||
print(x.which()) |
||||
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] |
||||
print("DUMPING2") |
||||
for x in pub_msgs: |
||||
print(x) |
||||
params = Params() |
||||
params.clear_all() |
||||
params.manager_start() |
||||
params.put("OpenpilotEnabledToggle", "1") |
||||
params.put("Passive", "0") |
||||
params.put("CommunityFeaturesToggle", "1") |
||||
|
||||
os.environ['NO_RADAR_SLEEP'] = "1" |
||||
|
||||
thread = threading.Thread(target=valgrindlauncher, args=("./ubloxd ", "../locationd")) |
||||
thread.daemon = True |
||||
thread.start() |
||||
|
||||
if cfg.init_callback is not None: |
||||
if 'can' not in list(cfg.pub_sub.keys()): |
||||
can_sock = None |
||||
cfg.init_callback(all_msgs, fsm, can_sock) |
||||
print("Got to here1") |
||||
# CP = car.CarParams.from_bytes(params.get("CarParams", block=True)) |
||||
print("Got to here2") |
||||
|
||||
log_msgs, msg_queue = [], [] |
||||
for msg in tqdm(pub_msgs): |
||||
print("MSG incoming(not a chemical, a message)") |
||||
print(msg) |
||||
|
||||
if msg.which() == 'can': |
||||
can_sock.send(msg.as_builder().to_bytes()) |
||||
else: |
||||
msg_queue.append(msg.as_builder()) |
||||
|
||||
return log_msgs |
||||
|
||||
if __name__ == "__main__": |
||||
cfg = CONFIGS[-1] |
||||
lr = LogReader("https://commadataci.blob.core.windows.net/openpilotci/76b83eb0245de90e/2020-03-05--19-16-05/3/rlog.bz2") |
||||
print(str(cfg)) |
||||
response = replay_process(cfg, lr) |
||||
for x in response: |
||||
print(x) |
Loading…
Reference in new issue