You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
4.4 KiB
168 lines
4.4 KiB
#!/usr/bin/env python3
|
|
from dataclasses import dataclass
|
|
|
|
from cereal import messaging, car
|
|
from opendbc.car.common.conversions import Conversions
|
|
from openpilot.common.realtime import DT_MDL
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
|
|
@dataclass
|
|
class Action:
|
|
accel: float # m/s^2
|
|
duration: float # seconds
|
|
|
|
|
|
@dataclass
|
|
class Maneuver:
|
|
description: str
|
|
actions: list[Action]
|
|
repeat: int = 0
|
|
initial_speed: float = 0. # m/s
|
|
|
|
_active: bool = False
|
|
_finished: bool = False
|
|
_action_index: int = 0
|
|
_action_frames: int = 0
|
|
_ready_cnt: int = 0
|
|
_repeated: int = 0
|
|
|
|
def get_accel(self, v_ego: float, enabled: bool, standstill: bool) -> float:
|
|
ready = abs(v_ego - self.initial_speed) < 0.4 and enabled and not standstill
|
|
self._ready_cnt = (self._ready_cnt + 1) if ready else 0
|
|
|
|
if self._ready_cnt > (3. / DT_MDL):
|
|
self._active = True
|
|
|
|
if not self._active:
|
|
return self.initial_speed - v_ego
|
|
|
|
action = self.actions[self._action_index]
|
|
|
|
self._action_frames += 1
|
|
|
|
# reached duration of action
|
|
if self._action_frames > (action.duration / DT_MDL):
|
|
# next action
|
|
if self._action_index < len(self.actions) - 1:
|
|
self._action_index += 1
|
|
self._action_frames = 0
|
|
# repeat maneuver
|
|
elif self._repeated < self.repeat:
|
|
self._repeated += 1
|
|
self._action_index = 0
|
|
self._action_frames = 0
|
|
self._active = False
|
|
# finish maneuver
|
|
else:
|
|
self._finished = True
|
|
|
|
return action.accel
|
|
|
|
@property
|
|
def finished(self):
|
|
return self._finished
|
|
|
|
@property
|
|
def active(self):
|
|
return self._active
|
|
|
|
|
|
MANEUVERS = [
|
|
Maneuver(
|
|
"creep: alternate between +1m/ss and -1m/ss",
|
|
[
|
|
Action(1, 2), Action(-1, 2),
|
|
Action(1, 2), Action(-1, 2),
|
|
Action(1, 2), Action(-1, 2),
|
|
],
|
|
repeat=1,
|
|
initial_speed=0.,
|
|
),
|
|
Maneuver(
|
|
"brake step response: -1m/ss from 20mph",
|
|
[Action(-1, 3)],
|
|
repeat=2,
|
|
initial_speed=20. * Conversions.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"brake step response: -4m/ss from 20mph",
|
|
[Action(-4, 3)],
|
|
repeat=2,
|
|
initial_speed=20. * Conversions.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"gas step response: +1m/ss from 20mph",
|
|
[Action(1, 3)],
|
|
repeat=2,
|
|
initial_speed=20. * Conversions.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"gas step response: +4m/ss from 20mph",
|
|
[Action(4, 3)],
|
|
repeat=2,
|
|
initial_speed=20. * Conversions.MPH_TO_MS,
|
|
),
|
|
]
|
|
|
|
|
|
def main():
|
|
params = Params()
|
|
cloudlog.info("joystickd is waiting for CarParams")
|
|
CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
|
|
|
|
sm = messaging.SubMaster(['carState', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2')
|
|
pm = messaging.PubMaster(['longitudinalPlan', 'driverAssistance', 'alertDebug'])
|
|
|
|
maneuvers = iter(MANEUVERS)
|
|
maneuver = None
|
|
|
|
while True:
|
|
sm.update()
|
|
|
|
if maneuver is None:
|
|
maneuver = next(maneuvers, None)
|
|
|
|
if maneuver is None:
|
|
print('We are done!')
|
|
|
|
alert_msg = messaging.new_message('alertDebug')
|
|
alert_msg.valid = True
|
|
|
|
plan_send = messaging.new_message('longitudinalPlan')
|
|
plan_send.valid = sm.all_checks()
|
|
|
|
longitudinalPlan = plan_send.longitudinalPlan
|
|
accel = 0
|
|
cs = sm['carState']
|
|
|
|
if maneuver is not None:
|
|
accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill)
|
|
|
|
if maneuver.active:
|
|
alert_msg.alertDebug.alertText1 = 'Maneuver: Active'
|
|
else:
|
|
alert_msg.alertDebug.alertText1 = f'Reaching Speed: {maneuver.initial_speed * Conversions.MS_TO_MPH:0.2f} mph'
|
|
alert_msg.alertDebug.alertText2 = f'Requesting {accel:0.2f} m/s^2'
|
|
pm.send('alertDebug', alert_msg)
|
|
|
|
longitudinalPlan.aTarget = accel
|
|
longitudinalPlan.shouldStop = cs.vEgo < CP.vEgoStopping and accel < 0 # should_stop
|
|
|
|
longitudinalPlan.allowBrake = True
|
|
longitudinalPlan.allowThrottle = True
|
|
longitudinalPlan.hasLead = True
|
|
|
|
pm.send('longitudinalPlan', plan_send)
|
|
|
|
assistance_send = messaging.new_message('driverAssistance')
|
|
assistance_send.valid = True
|
|
pm.send('driverAssistance', assistance_send)
|
|
|
|
# print('finished?', maneuver is not None and maneuver.finished)
|
|
# print('aTarget:', longitudinalPlan.aTarget)
|
|
# print('shouldStop:', longitudinalPlan.shouldStop)
|
|
|
|
if maneuver is not None and maneuver.finished:
|
|
maneuver = None
|
|
|