#!/usr/bin/env python3 from dataclasses import dataclass from cereal import messaging, car from opendbc.car.common.conversions import Conversions from openpilot.common.realtime import DT_MDL from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog @dataclass class Action: accel: float # m/s^2 duration: float # seconds @dataclass class Maneuver: description: str actions: list[Action] repeat: int = 0 initial_speed: float = 0. # m/s _active: bool = False _finished: bool = False _action_index: int = 0 _action_frames: int = 0 _ready_cnt: int = 0 _repeated: int = 0 def get_accel(self, v_ego: float, enabled: bool, standstill: bool) -> float: ready = abs(v_ego - self.initial_speed) < 0.4 and enabled and not standstill self._ready_cnt = (self._ready_cnt + 1) if ready else 0 if self._ready_cnt > (3. / DT_MDL): self._active = True if not self._active: return self.initial_speed - v_ego action = self.actions[self._action_index] self._action_frames += 1 # reached duration of action if self._action_frames > (action.duration / DT_MDL): # next action if self._action_index < len(self.actions) - 1: self._action_index += 1 self._action_frames = 0 # repeat maneuver elif self._repeated < self.repeat: self._repeated += 1 self._action_index = 0 self._action_frames = 0 self._active = False # finish maneuver else: self._finished = True return action.accel @property def finished(self): return self._finished @property def active(self): return self._active MANEUVERS = [ Maneuver( "creep: alternate between +1m/ss and -1m/ss", [ Action(1, 2), Action(-1, 2), Action(1, 2), Action(-1, 2), Action(1, 2), Action(-1, 2), ], repeat=1, initial_speed=0., ), Maneuver( "brake step response: -1m/ss from 20mph", [Action(-1, 3)], repeat=2, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "brake step response: -4m/ss from 20mph", [Action(-4, 3)], repeat=2, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "gas step response: +1m/ss from 20mph", [Action(1, 3)], repeat=2, initial_speed=20. * Conversions.MPH_TO_MS, ), Maneuver( "gas step response: +4m/ss from 20mph", [Action(4, 3)], repeat=2, initial_speed=20. * Conversions.MPH_TO_MS, ), ] def main(): params = Params() cloudlog.info("joystickd is waiting for CarParams") CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams) sm = messaging.SubMaster(['carState', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2') pm = messaging.PubMaster(['longitudinalPlan', 'driverAssistance', 'alertDebug']) maneuvers = iter(MANEUVERS) maneuver = None while True: sm.update() if maneuver is None: maneuver = next(maneuvers, None) if maneuver is None: print('We are done!') alert_msg = messaging.new_message('alertDebug') alert_msg.valid = True plan_send = messaging.new_message('longitudinalPlan') plan_send.valid = sm.all_checks() longitudinalPlan = plan_send.longitudinalPlan accel = 0 cs = sm['carState'] if maneuver is not None: accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill) if maneuver.active: alert_msg.alertDebug.alertText1 = 'Maneuver: Active' else: alert_msg.alertDebug.alertText1 = f'Reaching Speed: {maneuver.initial_speed * Conversions.MS_TO_MPH:0.2f} mph' alert_msg.alertDebug.alertText2 = f'Requesting {accel:0.2f} m/s^2' pm.send('alertDebug', alert_msg) longitudinalPlan.aTarget = accel longitudinalPlan.shouldStop = cs.vEgo < CP.vEgoStopping and accel < 0 longitudinalPlan.allowBrake = True longitudinalPlan.allowThrottle = True longitudinalPlan.hasLead = True pm.send('longitudinalPlan', plan_send) assistance_send = messaging.new_message('driverAssistance') assistance_send.valid = True pm.send('driverAssistance', assistance_send) # print('finished?', maneuver is not None and maneuver.finished) # print('aTarget:', longitudinalPlan.aTarget) # print('shouldStop:', longitudinalPlan.shouldStop) if maneuver is not None and maneuver.finished: maneuver = None