|
|
|
@ -28,41 +28,27 @@ from openpilot.selfdrive.controls.lib.longitudinal_planner import get_accel_from |
|
|
|
|
class Action: |
|
|
|
|
accel: float # m/s^2 |
|
|
|
|
duration: float # seconds |
|
|
|
|
longControlState: CarControl.Actuators.LongControlState = CarControl.Actuators.LongControlState.pid |
|
|
|
|
|
|
|
|
|
def get_msgs(self): |
|
|
|
|
return [ |
|
|
|
|
(t, CarControl( |
|
|
|
|
enabled=True, |
|
|
|
|
longActive=True, |
|
|
|
|
actuators=CarControl.Actuators( |
|
|
|
|
accel=self.accel, |
|
|
|
|
longControlState=self.longControlState, |
|
|
|
|
), |
|
|
|
|
)) |
|
|
|
|
for t in np.linspace(0, self.duration, int(self.duration/DT_CTRL)) |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
|
class Maneuver: |
|
|
|
|
description: str |
|
|
|
|
actions: list[Action] |
|
|
|
|
# TODO: implement repeat |
|
|
|
|
repeat: int = 1 |
|
|
|
|
repeat: int = 0 |
|
|
|
|
initial_speed: float = 0. # m/s |
|
|
|
|
|
|
|
|
|
_active: bool = False |
|
|
|
|
_finished: bool = False |
|
|
|
|
_action_index: int = 0 |
|
|
|
|
_start_frame: int = 0 |
|
|
|
|
_action_frames: int = 0 |
|
|
|
|
_ready_cnt: int = 0 |
|
|
|
|
_active_frames: int = 0 |
|
|
|
|
_repeated: int = 0 |
|
|
|
|
|
|
|
|
|
def get_accel(self, v_ego: float, enabled: bool, standstill: bool, frame: int) -> float: |
|
|
|
|
def get_accel(self, v_ego: float, enabled: bool, standstill: bool) -> float: |
|
|
|
|
ready = abs(v_ego - self.initial_speed) < 0.4 and enabled and not standstill |
|
|
|
|
self._ready_cnt = (self._ready_cnt + 1) if ready else 0 |
|
|
|
|
|
|
|
|
|
if self._ready_cnt > (2. / DT_MDL): |
|
|
|
|
if self._ready_cnt > (3. / DT_MDL): |
|
|
|
|
self._active = True |
|
|
|
|
|
|
|
|
|
if not self._active: |
|
|
|
@ -70,14 +56,23 @@ class Maneuver: |
|
|
|
|
|
|
|
|
|
action = self.actions[self._action_index] |
|
|
|
|
|
|
|
|
|
self._active_frames += 1 |
|
|
|
|
|
|
|
|
|
if self._active_frames > (action.duration / DT_MDL): |
|
|
|
|
self._action_index += 1 |
|
|
|
|
self._active_frames = 0 |
|
|
|
|
if self._action_index == len(self.actions): |
|
|
|
|
self._action_frames += 1 |
|
|
|
|
|
|
|
|
|
# reached duration of action |
|
|
|
|
if self._action_frames > (action.duration / DT_MDL): |
|
|
|
|
# next action |
|
|
|
|
if self._action_index < len(self.actions) - 1: |
|
|
|
|
self._action_index += 1 |
|
|
|
|
self._action_frames = 0 # TODO rename action frames? |
|
|
|
|
# repeat maneuver |
|
|
|
|
elif self._repeated < self.repeat: |
|
|
|
|
self._repeated += 1 |
|
|
|
|
self._action_index = 0 |
|
|
|
|
self._action_frames = 0 |
|
|
|
|
self._active = False |
|
|
|
|
# finish maneuver |
|
|
|
|
else: |
|
|
|
|
self._finished = True |
|
|
|
|
return 0. |
|
|
|
|
|
|
|
|
|
return action.accel |
|
|
|
|
|
|
|
|
@ -85,12 +80,6 @@ class Maneuver: |
|
|
|
|
def finished(self): |
|
|
|
|
return self._finished |
|
|
|
|
|
|
|
|
|
# def get_msgs(self): |
|
|
|
|
# t0 = 0 |
|
|
|
|
# for action in self.actions: |
|
|
|
|
# for lt, msg in action.get_msgs(): |
|
|
|
|
# yield lt + t0, msg |
|
|
|
|
# t0 += lt |
|
|
|
|
|
|
|
|
|
MANEUVERS = [ |
|
|
|
|
Maneuver( |
|
|
|
@ -100,31 +89,31 @@ MANEUVERS = [ |
|
|
|
|
Action(1, 2), Action(-1, 2), |
|
|
|
|
Action(1, 2), Action(-1, 2), |
|
|
|
|
], |
|
|
|
|
repeat=2, |
|
|
|
|
repeat=1, |
|
|
|
|
initial_speed=0., |
|
|
|
|
), |
|
|
|
|
Maneuver( |
|
|
|
|
"brake step response: -1m/ss from 20mph", |
|
|
|
|
[Action(-1, 3)], |
|
|
|
|
repeat=3, |
|
|
|
|
repeat=2, |
|
|
|
|
initial_speed=20. * Conversions.MPH_TO_MS, |
|
|
|
|
), |
|
|
|
|
Maneuver( |
|
|
|
|
"brake step response: -4m/ss from 20mph", |
|
|
|
|
[Action(-4, 3)], |
|
|
|
|
repeat=3, |
|
|
|
|
repeat=2, |
|
|
|
|
initial_speed=20. * Conversions.MPH_TO_MS, |
|
|
|
|
), |
|
|
|
|
Maneuver( |
|
|
|
|
"gas step response: +1m/ss from 20mph", |
|
|
|
|
[Action(1, 3)], |
|
|
|
|
repeat=3, |
|
|
|
|
repeat=2, |
|
|
|
|
initial_speed=20. * Conversions.MPH_TO_MS, |
|
|
|
|
), |
|
|
|
|
Maneuver( |
|
|
|
|
"gas step response: +4m/ss from 20mph", |
|
|
|
|
[Action(4, 3)], |
|
|
|
|
repeat=3, |
|
|
|
|
repeat=2, |
|
|
|
|
initial_speed=20. * Conversions.MPH_TO_MS, |
|
|
|
|
), |
|
|
|
|
] |
|
|
|
@ -209,7 +198,7 @@ def main(): |
|
|
|
|
cs = sm['carState'] |
|
|
|
|
|
|
|
|
|
if maneuver is not None: |
|
|
|
|
accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill, sm.frame) |
|
|
|
|
accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill) |
|
|
|
|
|
|
|
|
|
longitudinalPlan.aTarget = accel |
|
|
|
|
longitudinalPlan.shouldStop = cs.vEgo < CP.vEgoStopping and accel < 0 # should_stop |
|
|
|
@ -224,7 +213,7 @@ def main(): |
|
|
|
|
assistance_send.valid = True |
|
|
|
|
pm.send('driverAssistance', assistance_send) |
|
|
|
|
|
|
|
|
|
print('finished?', maneuver.finished) |
|
|
|
|
print('finished?', maneuver is not None and maneuver.finished) |
|
|
|
|
print('aTarget:', longitudinalPlan.aTarget) |
|
|
|
|
|
|
|
|
|
if maneuver is not None and maneuver.finished: |
|
|
|
|