replace unlogger.py with c++ replay (#22430)

pull/22435/head
Willem Melching 4 years ago committed by GitHub
parent 94afd0ea0f
commit e233d59e03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      tools/replay/README.md
  2. 81
      tools/replay/rqplot.py
  3. 500
      tools/replay/unlogger.py

@ -10,24 +10,26 @@ In order to replay specific route:
MOCK=1 selfdrive/boardd/tests/boardd_old.py MOCK=1 selfdrive/boardd/tests/boardd_old.py
# In another terminal: # In another terminal:
python replay/unlogger.py <route-name> <path-to-data-directory> selfdrive/ui/replay/replay <route-name>
``` ```
Replay driving data Replay driving data
------------- -------------
`unlogger.py` replays all the messages logged while running openpilot. `replay` replays all the messages logged while running openpilot.
Unlogger with remote data: Replay with remote data:
```bash ```bash
# Log in via browser # Log in via browser to have access to non public route
python lib/auth.py python lib/auth.py
# Start unlogger # Start replay
python replay/unlogger.py <route-name> selfdrive/ui/replay/replay <route-name>
# Example: # Example:
# python replay/unlogger.py '4cf7a6ad03080c90|2021-09-29--13-46-36' # selfdrive/ui/replay/replay '4cf7a6ad03080c90|2021-09-29--13-46-36'
# or use --demo to replay the default demo route:
# selfdrive/ui/replay/replay --demo
# In another terminal you can run a debug visualizer: # In another terminal you can run a debug visualizer:
python replay/ui.py # Define the environmental variable HORIZONTAL is the ui layout is too tall python replay/ui.py # Define the environmental variable HORIZONTAL is the ui layout is too tall
@ -40,30 +42,20 @@ cd selfdrive/ui && ./ui
## usage ## usage
``` bash ``` bash
$ ./unlogger.py -h $ selfdrive/ui/replay/replay -h
usage: unlogger.py [-h] [--no-loop] [--min | --enabled ENABLED] [--disabled DISABLED] [--tl PUBLISH_TIME_LENGTH] [--no-realtime] Usage: selfdrive/ui/replay/replay [options] route
[--no-interactive] [--bind-early] [--no-visionipc] [--start-time START_TIME]
[route_name] [data_dir] [address_mapping [address_mapping ...]]
Mock openpilot components by publishing logged messages. Mock openpilot components by publishing logged messages.
positional arguments: Options:
route_name The route whose messages will be published. (default: None) -h, --help Displays this help.
data_dir Path to directory in which log and camera files are located. (default: None) -a, --allow <allow> whitelist of services to send
address_mapping Pairs <service>=<zmq_addr> to publish <service> on <zmq_addr>. (default: None) -b, --block <block> blacklist of services to send
-s, --start <seconds> start from <seconds>
optional arguments: --demo use a demo route instead of providing your own
-h, --help show this help message and exit --dcam load driver camera
--no-loop Stop at the end of the replay. (default: False) --ecam load wide road camera
--min
--enabled ENABLED Arguments:
--disabled DISABLED route the drive to replay. find your drives at
--tl PUBLISH_TIME_LENGTH connect.comma.ai
Length of interval in event time for which messages should be published. (default: None)
--no-realtime Publish messages as quickly as possible instead of realtime. (default: True)
--no-interactive Disable interactivity. (default: True)
--bind-early Bind early to avoid dropping messages. (default: False)
--no-visionipc Do not output video over visionipc (default: False)
--start-time START_TIME
Seek to this absolute time (in seconds) upon starting playback. (default: 0.0)
``` ```

@ -1,81 +0,0 @@
#!/usr/bin/env python
# type: ignore
import sys
import matplotlib.pyplot as plt
import numpy as np
import cereal.messaging as messaging
import time
# tool to plot one or more signals live. Call ex:
#./rqplot.py log.carState.vEgo log.carState.aEgo
# TODO: can this tool consume 10x less cpu?
def recursive_getattr(x, name):
l = name.split('.')
if len(l) == 1:
return getattr(x, name)
else:
return recursive_getattr(getattr(x, l[0]), ".".join(l[1:]) )
if __name__ == "__main__":
poller = messaging.Poller()
services = []
fields = []
subs = []
values = []
plt.ion()
fig, ax = plt.subplots()
#fig = plt.figure(figsize=(10, 15))
#ax = fig.add_subplot(111)
ax.grid(True)
fig.canvas.draw()
subs_name = sys.argv[1:]
lines = []
x, y = [], []
LEN = 500
for i, sub in enumerate(subs_name):
sub_split = sub.split(".")
services.append(sub_split[0])
fields.append(".".join(sub_split[1:]))
subs.append(messaging.sub_sock(sub_split[0], poller))
x.append(np.ones(LEN)*np.nan)
y.append(np.ones(LEN)*np.nan)
lines.append(ax.plot(x[i], y[i])[0])
for l in lines:
l.set_marker("*")
cur_t = 0.
ax.legend(subs_name)
ax.set_xlabel('time [s]')
while 1:
print(1./(time.time() - cur_t))
cur_t = time.time()
for i, s in enumerate(subs):
msg = messaging.recv_sock(s)
#msg = messaging.recv_one_or_none(s)
if msg is not None:
x[i] = np.append(x[i], getattr(msg, 'logMonoTime') / 1e9)
x[i] = np.delete(x[i], 0)
y[i] = np.append(y[i], recursive_getattr(msg, subs_name[i]))
y[i] = np.delete(y[i], 0)
lines[i].set_xdata(x[i])
lines[i].set_ydata(y[i])
ax.relim()
ax.autoscale_view(True, scaley=True, scalex=True)
fig.canvas.blit(ax.bbox)
fig.canvas.flush_events()
# just a bit of wait to avoid 100% CPU usage
time.sleep(0.001)

@ -1,500 +0,0 @@
#!/usr/bin/env python3
import argparse
import os
import sys
import zmq
import time
import signal
import multiprocessing
from uuid import uuid4
from collections import namedtuple
from collections import deque
from datetime import datetime
from cereal import log as capnp_log
from cereal.services import service_list
from cereal.messaging import pub_sock, MultiplePublishersError
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
from common import realtime
from common.transformations.camera import eon_f_frame_size, tici_f_frame_size
from tools.lib.kbhit import KBHit
from tools.lib.logreader import MultiLogIterator
from tools.lib.route import Route
from tools.lib.framereader import rgb24toyuv420
from tools.lib.route_framereader import RouteFrameReader
# Commands.
SetRoute = namedtuple("SetRoute", ("name", "start_time", "data_dir"))
SeekAbsoluteTime = namedtuple("SeekAbsoluteTime", ("secs",))
SeekRelativeTime = namedtuple("SeekRelativeTime", ("secs",))
TogglePause = namedtuple("TogglePause", ())
StopAndQuit = namedtuple("StopAndQuit", ())
VIPC_RGB = "rgb"
VIPC_YUV = "yuv"
class UnloggerWorker(object):
def __init__(self):
self._frame_reader = None
self._cookie = None
self._readahead = deque()
def run(self, commands_address, data_address, pub_types):
zmq.Context._instance = None
commands_socket = zmq.Context.instance().socket(zmq.PULL)
commands_socket.connect(commands_address)
data_socket = zmq.Context.instance().socket(zmq.PUSH)
data_socket.connect(data_address)
poller = zmq.Poller()
poller.register(commands_socket, zmq.POLLIN)
# We can't publish frames without roadEncodeIdx, so add when it's missing.
if "roadCameraState" in pub_types:
pub_types["roadEncodeIdx"] = None
# gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_OBJECTS | gc.DEBUG_STATS | gc.DEBUG_SAVEALL |
# gc.DEBUG_UNCOLLECTABLE)
# TODO: WARNING pycapnp leaks memory all over the place after unlogger runs for a while, gc
# pauses become huge because there are so many tracked objects solution will be to switch to new
# cython capnp
try:
route = None
while True:
while poller.poll(0.) or route is None:
cookie, cmd = commands_socket.recv_pyobj()
route = self._process_commands(cmd, route, pub_types)
# **** get message ****
self._read_logs(cookie, pub_types)
self._send_logs(data_socket)
finally:
if self._frame_reader is not None:
self._frame_reader.close()
data_socket.close()
commands_socket.close()
def _read_logs(self, cookie, pub_types):
fullHEVC = capnp_log.EncodeIndex.Type.fullHEVC
lr = self._lr
while len(self._readahead) < 1000:
route_time = lr.tell()
msg = next(lr)
typ = msg.which()
if typ not in pub_types:
continue
# **** special case certain message types ****
if typ == "roadEncodeIdx" and msg.roadEncodeIdx.type == fullHEVC:
# this assumes the roadEncodeIdx always comes before the frame
self._frame_id_lookup[
msg.roadEncodeIdx.frameId] = msg.roadEncodeIdx.segmentNum, msg.roadEncodeIdx.segmentId
#print "encode", msg.roadEncodeIdx.frameId, len(self._readahead), route_time
self._readahead.appendleft((typ, msg, route_time, cookie))
def _send_logs(self, data_socket):
while len(self._readahead) > 500:
typ, msg, route_time, cookie = self._readahead.pop()
smsg = msg.as_builder()
if typ == "roadCameraState":
frame_id = msg.roadCameraState.frameId
# Frame exists, make sure we have a framereader.
# load the frame readers as needed
s1 = time.time()
try:
img = self._frame_reader.get(frame_id, pix_fmt="rgb24")
except Exception:
img = None
fr_time = time.time() - s1
if fr_time > 0.05:
print("FRAME(%d) LAG -- %.2f ms" % (frame_id, fr_time*1000.0))
if img is not None:
extra = (smsg.roadCameraState.frameId, smsg.roadCameraState.timestampSof, smsg.roadCameraState.timestampEof)
# send YUV frame
if os.getenv("YUV") is not None:
img_yuv = rgb24toyuv420(img)
data_socket.send_pyobj((cookie, VIPC_YUV, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE)
data_socket.send(img_yuv.flatten().tobytes(), copy=False)
img = img[:, :, ::-1] # Convert RGB to BGR, which is what the camera outputs
img = img.flatten()
bts = img.tobytes()
smsg.roadCameraState.image = bts
# send RGB frame
data_socket.send_pyobj((cookie, VIPC_RGB, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE)
data_socket.send(bts, copy=False)
data_socket.send_pyobj((cookie, typ, msg.logMonoTime, route_time), flags=zmq.SNDMORE)
data_socket.send(smsg.to_bytes(), copy=False)
def _process_commands(self, cmd, route, pub_types):
seek_to = None
if route is None or (isinstance(cmd, SetRoute) and route.name != cmd.name):
seek_to = cmd.start_time
route = Route(cmd.name, cmd.data_dir)
self._lr = MultiLogIterator(route.log_paths(), wraparound=True)
if self._frame_reader is not None:
self._frame_reader.close()
if "roadCameraState" in pub_types or "roadEncodeIdx" in pub_types:
# reset frames for a route
self._frame_id_lookup = {}
self._frame_reader = RouteFrameReader(
route.camera_paths(), None, self._frame_id_lookup, readahead=True)
# always reset this on a seek
if isinstance(cmd, SeekRelativeTime):
seek_to = self._lr.tell() + cmd.secs
elif isinstance(cmd, SeekAbsoluteTime):
seek_to = cmd.secs
elif isinstance(cmd, StopAndQuit):
exit()
if seek_to is not None:
print("seeking", seek_to)
if not self._lr.seek(seek_to):
print("Can't seek: time out of bounds")
else:
next(self._lr) # ignore one
return route
def _get_address_send_func(address):
sock = pub_sock(address)
return sock.send
def _get_vipc_server(length):
sizes = {3 * w * h: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]} # RGB
sizes.update({(3 * w * h) / 2: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]}) # YUV
w, h = sizes[length]
vipc_server = VisionIpcServer("camerad")
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, w, h)
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_YUV_BACK, 40, False, w, h)
vipc_server.start_listener()
return vipc_server
def unlogger_thread(command_address, forward_commands_address, data_address, run_realtime,
address_mapping, publish_time_length, bind_early, no_loop, no_visionipc):
# Clear context to avoid problems with multiprocessing.
zmq.Context._instance = None
context = zmq.Context.instance()
command_sock = context.socket(zmq.PULL)
command_sock.bind(command_address)
forward_commands_socket = context.socket(zmq.PUSH)
forward_commands_socket.bind(forward_commands_address)
data_socket = context.socket(zmq.PULL)
data_socket.bind(data_address)
# Set readahead to a reasonable number.
data_socket.setsockopt(zmq.RCVHWM, 10000)
poller = zmq.Poller()
poller.register(command_sock, zmq.POLLIN)
poller.register(data_socket, zmq.POLLIN)
if bind_early:
send_funcs = {
typ: _get_address_send_func(address)
for typ, address in address_mapping.items()
}
# Give subscribers a chance to connect.
time.sleep(0.1)
else:
send_funcs = {}
start_time = float("inf")
printed_at = 0
generation = 0
paused = False
reset_time = True
prev_msg_time = None
vipc_server = None
while True:
evts = dict(poller.poll())
if command_sock in evts:
cmd = command_sock.recv_pyobj()
if isinstance(cmd, TogglePause):
paused = not paused
if paused:
poller.modify(data_socket, 0)
else:
poller.modify(data_socket, zmq.POLLIN)
else:
# Forward the command the the log data thread.
# TODO: Remove everything on data_socket.
generation += 1
forward_commands_socket.send_pyobj((generation, cmd))
if isinstance(cmd, StopAndQuit):
return
reset_time = True
elif data_socket in evts:
msg_generation, typ, msg_time, route_time, *extra = data_socket.recv_pyobj(flags=zmq.RCVMORE)
msg_bytes = data_socket.recv()
if msg_generation < generation:
# Skip packets.
continue
if no_loop and prev_msg_time is not None and prev_msg_time > msg_time + 1e9:
generation += 1
forward_commands_socket.send_pyobj((generation, StopAndQuit()))
return
prev_msg_time = msg_time
msg_time_seconds = msg_time * 1e-9
if reset_time:
msg_start_time = msg_time_seconds
real_start_time = realtime.sec_since_boot()
start_time = min(start_time, msg_start_time)
reset_time = False
if publish_time_length and msg_time_seconds - start_time > publish_time_length:
generation += 1
forward_commands_socket.send_pyobj((generation, StopAndQuit()))
return
# Print time.
if abs(printed_at - route_time) > 5.:
print("at", route_time)
printed_at = route_time
if typ not in send_funcs and typ not in [VIPC_RGB, VIPC_YUV]:
if typ in address_mapping:
# Remove so we don't keep printing warnings.
address = address_mapping.pop(typ)
try:
print("binding", typ)
send_funcs[typ] = _get_address_send_func(address)
except Exception as e:
print("couldn't replay {}: {}".format(typ, e))
continue
else:
# Skip messages that we are not registered to publish.
continue
# Sleep as needed for real time playback.
if run_realtime:
msg_time_offset = msg_time_seconds - msg_start_time
real_time_offset = realtime.sec_since_boot() - real_start_time
lag = msg_time_offset - real_time_offset
if lag > 0 and lag < 30: # a large jump is OK, likely due to an out of order segment
if lag > 1:
print("sleeping for", lag)
time.sleep(lag)
elif lag < -1:
# Relax the real time schedule when we slip far behind.
reset_time = True
# Send message.
try:
if typ in [VIPC_RGB, VIPC_YUV]:
if not no_visionipc:
if vipc_server is None:
vipc_server = _get_vipc_server(len(msg_bytes))
i, sof, eof = extra[0]
stream = VisionStreamType.VISION_STREAM_RGB_BACK if typ == VIPC_RGB else VisionStreamType.VISION_STREAM_YUV_BACK
vipc_server.send(stream, msg_bytes, i, sof, eof)
else:
send_funcs[typ](msg_bytes)
except MultiplePublishersError:
del send_funcs[typ]
def timestamp_to_s(tss):
return time.mktime(datetime.strptime(tss, '%Y-%m-%d--%H-%M-%S').timetuple())
def absolute_time_str(s, start_time):
try:
# first try if it's a float
return float(s)
except ValueError:
# now see if it's a timestamp
return timestamp_to_s(s) - start_time
def _get_address_mapping(args):
if args.min is not None:
services_to_mock = [
'deviceState', 'can', 'pandaState', 'sensorEvents', 'gpsNMEA', 'roadCameraState', 'roadEncodeIdx',
'modelV2', 'liveLocation',
]
elif args.enabled is not None:
services_to_mock = args.enabled
else:
services_to_mock = service_list.keys()
address_mapping = {service_name: service_name for service_name in services_to_mock}
address_mapping.update(dict(args.address_mapping))
for k in args.disabled:
address_mapping.pop(k, None)
non_services = set(address_mapping) - set(service_list)
if non_services:
print("WARNING: Unknown services {}".format(list(non_services)))
return address_mapping
def keyboard_controller_thread(q, route_start_time):
print("keyboard waiting for input")
kb = KBHit()
while 1:
c = kb.getch()
if c == 'm': # Move forward by 1m
q.send_pyobj(SeekRelativeTime(60))
elif c == 'M': # Move backward by 1m
q.send_pyobj(SeekRelativeTime(-60))
elif c == 's': # Move forward by 10s
q.send_pyobj(SeekRelativeTime(10))
elif c == 'S': # Move backward by 10s
q.send_pyobj(SeekRelativeTime(-10))
elif c == 'G': # Move backward by 10s
q.send_pyobj(SeekAbsoluteTime(0.))
elif c == "\x20": # Space bar.
q.send_pyobj(TogglePause())
elif c == "\n":
try:
seek_time_input = input('time: ')
seek_time = absolute_time_str(seek_time_input, route_start_time)
# If less than 60, assume segment number
if seek_time < 60:
seek_time *= 60
q.send_pyobj(SeekAbsoluteTime(seek_time))
except Exception as e:
print("Time not understood: {}".format(e))
def get_arg_parser():
parser = argparse.ArgumentParser(
description="Mock openpilot components by publishing logged messages.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("route_name", type=(lambda x: x.replace("#", "|")), nargs="?",
help="The route whose messages will be published.")
parser.add_argument("data_dir", nargs='?', default=os.getenv('UNLOGGER_DATA_DIR'),
help="Path to directory in which log and camera files are located.")
parser.add_argument("--no-loop", action="store_true", help="Stop at the end of the replay.")
def key_value_pair(x):
return x.split("=")
parser.add_argument("address_mapping", nargs="*", type=key_value_pair,
help="Pairs <service>=<zmq_addr> to publish <service> on <zmq_addr>.")
def comma_list(x):
return x.split(",")
to_mock_group = parser.add_mutually_exclusive_group()
to_mock_group.add_argument("--min", action="store_true", default=os.getenv("MIN"))
to_mock_group.add_argument("--enabled", default=os.getenv("ENABLED"), type=comma_list)
parser.add_argument("--disabled", type=comma_list, default=os.getenv("DISABLED") or ())
parser.add_argument(
"--tl", dest="publish_time_length", type=float, default=None,
help="Length of interval in event time for which messages should be published.")
parser.add_argument(
"--no-realtime", dest="realtime", action="store_false", default=True,
help="Publish messages as quickly as possible instead of realtime.")
parser.add_argument(
"--no-interactive", dest="interactive", action="store_false", default=True,
help="Disable interactivity.")
parser.add_argument(
"--bind-early", action="store_true", default=False,
help="Bind early to avoid dropping messages.")
parser.add_argument(
"--no-visionipc", action="store_true", default=False,
help="Do not output video over visionipc")
parser.add_argument(
"--start-time", type=float, default=0.,
help="Seek to this absolute time (in seconds) upon starting playback.")
return parser
def main(argv):
args = get_arg_parser().parse_args(sys.argv[1:])
command_address = "ipc:///tmp/{}".format(uuid4())
forward_commands_address = "ipc:///tmp/{}".format(uuid4())
data_address = "ipc:///tmp/{}".format(uuid4())
address_mapping = _get_address_mapping(args)
command_sock = zmq.Context.instance().socket(zmq.PUSH)
command_sock.connect(command_address)
if args.route_name is not None:
route_name_split = args.route_name.split("|")
if len(route_name_split) > 1:
route_start_time = timestamp_to_s(route_name_split[1])
else:
route_start_time = 0
command_sock.send_pyobj(
SetRoute(args.route_name, args.start_time, args.data_dir))
else:
print("waiting for external command...")
route_start_time = 0
subprocesses = {}
try:
subprocesses["data"] = multiprocessing.Process(
target=UnloggerWorker().run,
args=(forward_commands_address, data_address, address_mapping.copy()))
subprocesses["control"] = multiprocessing.Process(
target=unlogger_thread,
args=(command_address, forward_commands_address, data_address, args.realtime,
_get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop, args.no_visionipc))
subprocesses["data"].start()
subprocesses["control"].start()
# Exit if any of the children die.
def exit_if_children_dead(*_):
for _, p in subprocesses.items():
if not p.is_alive():
[p.terminate() for p in subprocesses.values()]
exit()
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
signal.signal(signal.SIGCHLD, exit_if_children_dead)
if args.interactive:
keyboard_controller_thread(command_sock, route_start_time)
else:
# Wait forever for children.
while True:
time.sleep(10000.)
finally:
for p in subprocesses.values():
if p.is_alive():
try:
p.join(3.)
except multiprocessing.TimeoutError:
p.terminate()
continue
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
Loading…
Cancel
Save