#!/usr/bin/env python3 import io import os import time import base64 import argparse import numpy as np import matplotlib.pyplot as plt from collections import defaultdict from dataclasses import dataclass, asdict from pathlib import Path from cereal import messaging, car from opendbc.car.structs import CarControl from opendbc.car.common.conversions import Conversions from openpilot.common.realtime import DT_CTRL, DT_MDL, Ratekeeper from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N from openpilot.selfdrive.controls.lib.longitudinal_planner import get_accel_from_plan # TODOs # - support lateral maneuvers # - setup: show countdown? @dataclass class Action: accel: float # m/s^2 duration: float # seconds longControlState: CarControl.Actuators.LongControlState = CarControl.Actuators.LongControlState.pid def get_msgs(self): return [ (t, CarControl( enabled=True, longActive=True, actuators=CarControl.Actuators( accel=self.accel, longControlState=self.longControlState, ), )) for t in np.linspace(0, self.duration, int(self.duration/DT_CTRL)) ] @dataclass class Maneuver: description: str actions: list[Action] repeat: int = 1 initial_speed: float = 0. # m/s def get_msgs(self): t0 = 0 for action in self.actions: for lt, msg in action.get_msgs(): yield lt + t0, msg t0 += lt MANEUVERS = [ # Maneuver( # "creep: alternate between +1m/ss and -1m/ss", # [ # Action(1, 2), Action(-1, 2), # Action(1, 2), Action(-1, 2), # Action(1, 2), Action(-1, 2), # ], # repeat=2, # initial_speed=0., # ), Maneuver( "brake step response: -1m/ss from 20mph", [Action(-1, 3)], repeat=3, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "brake step response: -4m/ss from 20mph", [Action(-4, 3)], repeat=3, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "gas step response: +1m/ss from 20mph", [Action(1, 3)], repeat=3, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "gas step response: +4m/ss from 20mph", [Action(4, 3)], repeat=3, initial_speed=20. * Conversions.MPH_TO_MS, ), ] def report(args, logs, fp): output_path = Path(__file__).resolve().parent / "longitudinal_reports" output_fn = args.output or output_path / f"{fp}_{time.strftime('%Y%m%d-%H_%M_%S')}.html" output_path.mkdir(exist_ok=True) with open(output_fn, "w") as f: f.write("

Longitudinal maneuver report

\n") f.write(f"

{fp}

\n") if args.desc: f.write(f"

{args.desc}

") for description, runs in logs.items(): f.write("
\n") f.write(f"

{description}

\n") for run, log in runs.items(): f.write(f"

Run #{int(run)+1}

\n") plt.rcParams['font.size'] = 40 fig = plt.figure(figsize=(30, 25)) ax = fig.subplots(4, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 3, 1, 1]}) ax[0].grid(linewidth=4) ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6) ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6) ax[0].set_ylabel('Acceleration (m/s^2)') #ax[0].set_ylim(-6.5, 6.5) ax[0].legend() ax[1].grid(linewidth=4) ax[1].plot(log["t"], log["carState.vEgo"], 'g', label='vEgo', linewidth=6) ax[1].set_ylabel('Velocity (m/s)') ax[1].legend() ax[2].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6) ax[3].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6) ax[3].plot(log["t"], log["carState.brakePressed"], label='brakePressed', linewidth=6) for i in (2, 3): ax[i].set_yticks([0, 1], minor=False) ax[i].set_ylim(-1, 2) ax[i].legend() ax[-1].set_xlabel("Time (s)") fig.tight_layout() buffer = io.BytesIO() fig.savefig(buffer, format='png') buffer.seek(0) f.write(f"\n") import json f.write(f"

{json.dumps(logs)}

") print(f"\nReport written to {output_fn}\n") def main(): params = Params() cloudlog.info("joystickd is waiting for CarParams") CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams) sm = messaging.SubMaster(['carState', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2') pm = messaging.PubMaster(['longitudinalPlan', 'driverAssistance']) maneuvers = iter(MANEUVERS) maneuver = None ready_cnt = 0 repeat = 0 maneuver_active = False while True: sm.update() if maneuver is None: maneuver = next(maneuvers, None) if maneuver is None: print('We are done!') # print(maneuver) if maneuver is not None: pass plan_send = messaging.new_message('longitudinalPlan') plan_send.valid = sm.all_checks() longitudinalPlan = plan_send.longitudinalPlan if maneuver is not None: # desired_ v_ego = sm['carState'].vEgo cs = sm['carState'] ready = abs(v_ego - maneuver.initial_speed) < 0.4 and cs.cruiseState.enabled and not cs.cruiseState.standstill ready_cnt = (ready_cnt + 1) if ready else 0 if ready_cnt > (2. / DT_MDL): print('ready!!!') maneuver_active = True if maneuver_active: accel = maneuver.actions[0].accel diff = v_ego * - v_ego # longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)] longitudinalPlan.speeds = [v_ego + accel for i in range(CONTROL_N)] longitudinalPlan.accels = [accel] * CONTROL_N longitudinalPlan.jerks = [0] * CONTROL_N print('v_ego', sm['carState'].vEgo) print('speeds:', longitudinalPlan.speeds) print('accels:', longitudinalPlan.accels) else: accel = (maneuver.initial_speed - cs.vEgo) diff = maneuver.initial_speed - v_ego longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)] longitudinalPlan.accels = [accel] * CONTROL_N longitudinalPlan.jerks = [0] * CONTROL_N # print('v_ego', sm['carState'].vEgo) # print('speeds:', longitudinalPlan.speeds) # print('accels:', longitudinalPlan.accels) a_target, should_stop = get_accel_from_plan(CP, longitudinalPlan.speeds, longitudinalPlan.accels) longitudinalPlan.aTarget = accel longitudinalPlan.shouldStop = should_stop print('aTarget:', longitudinalPlan.aTarget) longitudinalPlan.allowBrake = True longitudinalPlan.allowThrottle = True longitudinalPlan.hasLead = True pm.send('longitudinalPlan', plan_send) assistance_send = messaging.new_message('driverAssistance') assistance_send.valid = True pm.send('driverAssistance', assistance_send) print("\n\n") maneuvers = MANEUVERS if len(args.maneuvers): maneuvers = [MANEUVERS[i-1] for i in set(args.maneuvers)] logs = {} rk = Ratekeeper(int(1./DT_CTRL)) for i, m in enumerate(maneuvers): logs[m.description] = {} print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'") for run in range(m.repeat): print(f"- run #{run}") print("- setting up, engage cruise") ready_cnt = 0 for _ in range(int(2*60./DT_CTRL)): cs = p.read(strict=False) cc = CarControl( enabled=True, longActive=True, actuators=CarControl.Actuators( accel=(m.initial_speed - cs.vEgo)*0.8, longControlState=CarControl.Actuators.LongControlState.pid, ), ) if m.initial_speed < 0.1: cc.actuators.accel = -2 cc.actuators.longControlState = CarControl.Actuators.LongControlState.stopping p.write(cc) ready = cs.cruiseState.enabled and not cs.cruiseState.standstill and ((m.initial_speed - 0.6) < cs.vEgo < (m.initial_speed + 0.6)) ready_cnt = (ready_cnt+1) if ready else 0 if ready_cnt > (2./DT_CTRL): break rk.keep_time() else: print("ERROR: failed to setup") continue print("- executing maneuver") logs[m.description][run] = defaultdict(list) for t, cc in m.get_msgs(): cs = p.read() p.write(cc) logs[m.description][run]["t"].append(t) to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators, "carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState} for k, v in to_log.items(): for k2, v2 in asdict(v).items(): logs[m.description][run][f"{k}.{k2}"].append(v2) rk.keep_time() # print("writing out report") # with open('/tmp/logs.json', 'w') as f: # import json # json.dump(logs, f, indent=2) # report(args, logs, p.CI.CP.carFingerprint) if __name__ == "__main__": main() exit() maneuver_help = "\n".join([f"{i+1}. {m.description}" for i, m in enumerate(MANEUVERS)]) parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.", formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--desc', help="Extra description to include in report.") parser.add_argument('--output', help="Write out report to this file.", default=None) parser.add_argument('maneuvers', nargs='*', type=int, default=None, help=f'Deafult is all.\n{maneuver_help}') args = parser.parse_args() print(args) if "REPORT_TEST" in os.environ: with open(os.environ["REPORT_TEST"]) as f: import json logs = json.loads(f.read().split("none'>")[1].split('

')[0]) report(args, logs, "testing") exit() assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'" main(args)