model replay: more consistent replay (#23237)

* first cleanup

* don't send calib and desire

* should always be valid

* little more
old-commit-hash: 66ee3e9ef9
commatwo_master
Adeeb Shihadeh 3 years ago committed by GitHub
parent b99c359d61
commit 24c730f083
  1. 130
      selfdrive/test/process_replay/model_replay.py

@ -2,9 +2,9 @@
import os import os
import sys import sys
import time import time
from typing import Any from collections import defaultdict
from tqdm import tqdm from tqdm import tqdm
from typing import Any
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
@ -25,50 +25,23 @@ if TICI:
TEST_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36" TEST_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"
else: else:
TEST_ROUTE = "303055c0002aefd1|2021-11-22--18-36-32" TEST_ROUTE = "303055c0002aefd1|2021-11-22--18-36-32"
SEGMENT = 0
CACHE_DIR = os.getenv("CACHE_DIR", None) SEND_EXTRA_INPUTS = bool(os.getenv("SEND_EXTRA_INPUTS", "0"))
packet_from_camera = {"roadCameraState":"modelV2", "driverCameraState":"driverState"}
def get_log_fn(ref_commit): def get_log_fn(ref_commit):
return "%s_%s_%s.bz2" % (TEST_ROUTE, "model_tici" if TICI else "model", ref_commit) return "%s_%s_%s.bz2" % (TEST_ROUTE, "model_tici" if TICI else "model", ref_commit)
def replace_calib(msg, calib): def replace_calib(msg, calib):
msg = msg.as_builder() msg = msg.as_builder()
if calib is not None: if calib is not None:
msg.liveCalibration.extrinsicMatrix = get_view_frame_from_road_frame(*calib, 1.22).flatten().tolist() msg.liveCalibration.extrinsicMatrix = get_view_frame_from_road_frame(*calib, 1.22).flatten().tolist()
return msg return msg
def process_frame(msg, pm, sm, log_msgs, vipc_server, spinner, frs, frame_idxs, last_desire):
if msg.which() == "roadCameraState" and last_desire is not None:
dat = messaging.new_message('lateralPlan')
dat.lateralPlan.desire = last_desire
pm.send('lateralPlan', dat)
f = msg.as_builder()
pm.send(msg.which(), f)
img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="yuv420p")[0] def model_replay(lr, frs):
if msg.which == "roadCameraState":
vipc_server.send(VisionStreamType.VISION_STREAM_ROAD, img.flatten().tobytes(), f.roadCameraState.frameId,
f.roadCameraState.timestampSof, f.roadCameraState.timestampEof)
else:
vipc_server.send(VisionStreamType.VISION_STREAM_DRIVER, img.flatten().tobytes(), f.driverCameraState.frameId,
f.driverCameraState.timestampSof, f.driverCameraState.timestampEof)
with Timeout(seconds=15):
log_msgs.append(messaging.recv_one(sm.sock[packet_from_camera[msg.which()]]))
frame_idxs[msg.which()] += 1
if frame_idxs[msg.which()] >= frs[msg.which()].frame_count:
return None
update_spinner(spinner, frame_idxs['roadCameraState'], frs['roadCameraState'].frame_count,
frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count)
return 0
def update_spinner(s, fidx, fcnt, didx, dcnt):
s.update("replaying models: road %d/%d, driver %d/%d" % (fidx, fcnt, didx, dcnt))
def model_replay(lr_list, frs):
spinner = Spinner() spinner = Spinner()
spinner.update("starting model replay") spinner.update("starting model replay")
@ -77,93 +50,110 @@ def model_replay(lr_list, frs):
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *(tici_d_frame_size if TICI else eon_d_frame_size)) vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *(tici_d_frame_size if TICI else eon_d_frame_size))
vipc_server.start_listener() vipc_server.start_listener()
pm = messaging.PubMaster(['roadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])
sm = messaging.SubMaster(['modelV2', 'driverState']) sm = messaging.SubMaster(['modelV2', 'driverState'])
pm = messaging.PubMaster(['roadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])
try: try:
managed_processes['modeld'].start() managed_processes['modeld'].start()
managed_processes['dmonitoringmodeld'].start() managed_processes['dmonitoringmodeld'].start()
time.sleep(5) time.sleep(2)
sm.update(1000) sm.update(1000)
last_desire = None
log_msgs = [] log_msgs = []
frame_idxs = dict.fromkeys(['roadCameraState','driverCameraState'], 0) last_desire = None
frame_idxs = defaultdict(lambda: 0)
cal = [msg for msg in lr if msg.which() == "liveCalibration"] # init modeld with valid calibration
for msg in cal[:5]: cal_msgs = [msg for msg in lr if msg.which() == "liveCalibration"]
pm.send(msg.which(), replace_calib(msg, None)) for _ in range(5):
pm.send(cal_msgs[0].which(), cal_msgs[0].as_builder())
time.sleep(0.1)
for msg in tqdm(lr_list): for msg in tqdm(lr):
if SEND_EXTRA_INPUTS:
if msg.which() == "liveCalibration": if msg.which() == "liveCalibration":
last_calib = list(msg.liveCalibration.rpyCalib) last_calib = list(msg.liveCalibration.rpyCalib)
pm.send(msg.which(), replace_calib(msg, last_calib)) pm.send(msg.which(), replace_calib(msg, last_calib))
elif msg.which() == "lateralPlan": elif msg.which() == "lateralPlan":
last_desire = msg.lateralPlan.desire last_desire = msg.lateralPlan.desire
elif msg.which() in ["roadCameraState", "driverCameraState"]: dat = messaging.new_message('lateralPlan')
ret = process_frame(msg, pm, sm, log_msgs, vipc_server, spinner, frs, frame_idxs, last_desire) dat.lateralPlan.desire = last_desire
if ret is None: pm.send('lateralPlan', dat)
if msg.which() in ["roadCameraState", "driverCameraState"]:
camera_state = getattr(msg, msg.which())
stream = VisionStreamType.VISION_STREAM_ROAD if msg.which() == "roadCameraState" else VisionStreamType.VISION_STREAM_DRIVER
img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="yuv420p")[0]
# send camera state and frame
pm.send(msg.which(), msg.as_builder())
vipc_server.send(stream, img.flatten().tobytes(), camera_state.frameId,
camera_state.timestampSof, camera_state.timestampEof)
# wait for a response
with Timeout(seconds=15):
packet_from_camera = {"roadCameraState": "modelV2", "driverCameraState": "driverState"}
log_msgs.append(messaging.recv_one(sm.sock[packet_from_camera[msg.which()]]))
frame_idxs[msg.which()] += 1
if frame_idxs[msg.which()] >= frs[msg.which()].frame_count:
break break
except KeyboardInterrupt: spinner.update("replaying models: road %d/%d, driver %d/%d" % (frame_idxs['roadCameraState'],
pass frs['roadCameraState'].frame_count, frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count))
finally: finally:
spinner.close() spinner.close()
managed_processes['modeld'].stop() managed_processes['modeld'].stop()
managed_processes['dmonitoringmodeld'].stop() managed_processes['dmonitoringmodeld'].stop()
return log_msgs return log_msgs
if __name__ == "__main__": if __name__ == "__main__":
update = "--update" in sys.argv update = "--update" in sys.argv
if TICI:
os.system('sudo mount -o remount,size=200M /tmp') # c3 hevcs are 75M each
replay_dir = os.path.dirname(os.path.abspath(__file__)) replay_dir = os.path.dirname(os.path.abspath(__file__))
ref_commit_fn = os.path.join(replay_dir, "model_replay_ref_commit") ref_commit_fn = os.path.join(replay_dir, "model_replay_ref_commit")
segnum = 0 # load logs
frs = {} lr = list(LogReader(get_url(TEST_ROUTE, SEGMENT)))
if CACHE_DIR: frs = {
lr = LogReader(os.path.join(CACHE_DIR, '%s--%d--rlog.bz2' % (TEST_ROUTE.replace('|', '_'), segnum))) 'roadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, log_type="fcamera")),
frs['roadCameraState'] = FrameReader(os.path.join(CACHE_DIR, '%s--%d--fcamera.hevc' % (TEST_ROUTE.replace('|', '_'), segnum))) 'driverCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, log_type="dcamera")),
frs['driverCameraState'] = FrameReader(os.path.join(CACHE_DIR, '%s--%d--dcamera.hevc' % (TEST_ROUTE.replace('|', '_'), segnum))) }
else:
lr = LogReader(get_url(TEST_ROUTE, segnum))
frs['roadCameraState'] = FrameReader(get_url(TEST_ROUTE, segnum, log_type="fcamera"))
frs['driverCameraState'] = FrameReader(get_url(TEST_ROUTE, segnum, log_type="dcamera"))
lr_list = list(lr) # run replay
log_msgs = model_replay(lr_list, frs) log_msgs = model_replay(lr, frs)
# get diff
failed = False failed = False
if not update: if not update:
ref_commit = open(ref_commit_fn).read().strip() with open(ref_commit_fn) as f:
ref_commit = f.read().strip()
log_fn = get_log_fn(ref_commit) log_fn = get_log_fn(ref_commit)
cmp_log = LogReader(BASE_URL + log_fn) cmp_log = LogReader(BASE_URL + log_fn)
ignore = ['logMonoTime', 'valid', ignore = [
'logMonoTime',
'modelV2.frameDropPerc', 'modelV2.frameDropPerc',
'modelV2.modelExecutionTime', 'modelV2.modelExecutionTime',
'driverState.modelExecutionTime', 'driverState.modelExecutionTime',
'driverState.dspExecutionTime'] 'driverState.dspExecutionTime'
]
tolerance = None if not PC else 1e-3 tolerance = None if not PC else 1e-3
results: Any = {TEST_ROUTE: {}} results: Any = {TEST_ROUTE: {}}
results[TEST_ROUTE]["models"] = compare_logs(cmp_log, log_msgs, tolerance=tolerance, ignore_fields=ignore) results[TEST_ROUTE]["models"] = compare_logs(cmp_log, log_msgs, tolerance=tolerance, ignore_fields=ignore)
diff1, diff2, failed = format_diff(results, ref_commit) diff1, diff2, failed = format_diff(results, ref_commit)
print(diff2) print(diff2)
print('-------------') print('-------------\n'*5)
print('-------------')
print('-------------')
print('-------------')
print('-------------')
print(diff1) print(diff1)
with open("model_diff.txt", "w") as f: with open("model_diff.txt", "w") as f:
f.write(diff2) f.write(diff2)
# upload new refs
if update or failed: if update or failed:
from selfdrive.test.openpilotci import upload_file from selfdrive.test.openpilotci import upload_file

Loading…
Cancel
Save