openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

99 lines
3.2 KiB

#!/usr/bin/env python3
import os
import sys
import multiprocessing
import subprocess
import argparse
from tempfile import NamedTemporaryFile
from common.basedir import BASEDIR
from selfdrive.test.process_replay.compare_logs import save_log
from tools.lib.route import Route
from tools.lib.logreader import LogReader
juggle_dir = os.path.dirname(os.path.realpath(__file__))
def load_segment(segment_name):
print(f"Loading {segment_name}")
if segment_name is None:
return []
try:
return list(LogReader(segment_name))
except ValueError as e:
print(f"Error parsing {segment_name}: {e}")
return []
def juggle_file(fn, dbc=None, layout=None):
env = os.environ.copy()
env["BASEDIR"] = BASEDIR
if dbc:
env["DBC_NAME"] = dbc
pj = os.getenv("PLOTJUGGLER_PATH", "plotjuggler")
extra_args = ""
if layout is not None:
extra_args += f'-l {layout}'
subprocess.call(f'{pj} --plugin_folders {os.path.join(juggle_dir, "bin")} -d {fn} {extra_args}', shell=True, env=env, cwd=juggle_dir)
def juggle_route(route_name, segment_number, qlog, layout):
if route_name.startswith("http://") or route_name.startswith("https://"):
logs = [route_name]
else:
r = Route(route_name)
logs = r.qlog_paths() if qlog else r.log_paths()
if segment_number is not None:
logs = logs[segment_number:segment_number+1]
if None in logs:
fallback_answer = input("At least one of the rlogs in this segment does not exist, would you like to use the qlogs? (y/n) : ")
if fallback_answer == 'y':
logs = r.qlog_paths()
if segment_number is not None:
logs = logs[segment_number:segment_number+1]
else:
print(f"Please try a different {'segment' if segment_number is not None else 'route'}")
return
all_data = []
pool = multiprocessing.Pool(24)
for d in pool.map(load_segment, logs):
all_data += d
# Infer DBC name from logs
dbc = None
for cp in [m for m in all_data if m.which() == 'carParams']:
try:
DBC = __import__(f"selfdrive.car.{cp.carParams.carName}.values", fromlist=['DBC']).DBC
dbc = DBC[cp.carParams.carFingerprint]['pt']
except (ImportError, KeyError, AttributeError):
pass
break
tempfile = NamedTemporaryFile(suffix='.rlog', dir=juggle_dir)
save_log(tempfile.name, all_data, compress=False)
del all_data
juggle_file(tempfile.name, dbc, layout)
def get_arg_parser():
parser = argparse.ArgumentParser(description="PlotJuggler plugin for reading rlogs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--qlog", action="store_true", help="Use qlogs")
parser.add_argument("--layout", nargs='?', help="Run PlotJuggler with a pre-defined layout")
parser.add_argument("route_name", nargs='?', help="The name of the route that will be plotted.")
parser.add_argument("segment_number", type=int, nargs='?', help="The index of the segment that will be plotted")
return parser
if __name__ == "__main__":
arg_parser = get_arg_parser()
if len(sys.argv) == 1:
arg_parser.print_help()
sys.exit()
args = arg_parser.parse_args(sys.argv[1:])
juggle_route(args.route_name, args.segment_number, args.qlog, args.layout)