From d74e708dd6f7463140e4283b7be7c06b2e7467ff Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 11 Sep 2024 00:02:10 -0700 Subject: [PATCH] works --- examples/longitudinal_profiles.py | 178 ++++++++++-------------------- 1 file changed, 61 insertions(+), 117 deletions(-) diff --git a/examples/longitudinal_profiles.py b/examples/longitudinal_profiles.py index 6d0b6d2d44..a06a1cb00e 100755 --- a/examples/longitudinal_profiles.py +++ b/examples/longitudinal_profiles.py @@ -47,27 +47,62 @@ class Action: class Maneuver: description: str actions: list[Action] + # TODO: implement repeat repeat: int = 1 initial_speed: float = 0. # m/s - def get_msgs(self): - t0 = 0 - for action in self.actions: - for lt, msg in action.get_msgs(): - yield lt + t0, msg - t0 += lt + _active: bool = False + _finished: bool = False + _action_index: int = 0 + _start_frame: int = 0 + _ready_cnt: int = 0 + _active_frames: int = 0 + + def get_accel(self, v_ego: float, enabled: bool, standstill: bool, frame: int) -> float: + ready = abs(v_ego - self.initial_speed) < 0.4 and enabled and not standstill + self._ready_cnt = (self._ready_cnt + 1) if ready else 0 + + if self._ready_cnt > (2. / DT_MDL): + self._active = True + + if not self._active: + return self.initial_speed - v_ego + + action = self.actions[self._action_index] + + self._active_frames += 1 + + if self._active_frames > (action.duration / DT_MDL): + self._action_index += 1 + self._active_frames = 0 + if self._action_index == len(self.actions): + self._finished = True + return 0. + + return action.accel + + @property + def finished(self): + return self._finished + + # def get_msgs(self): + # t0 = 0 + # for action in self.actions: + # for lt, msg in action.get_msgs(): + # yield lt + t0, msg + # t0 += lt MANEUVERS = [ - # Maneuver( - # "creep: alternate between +1m/ss and -1m/ss", - # [ - # Action(1, 2), Action(-1, 2), - # Action(1, 2), Action(-1, 2), - # Action(1, 2), Action(-1, 2), - # ], - # repeat=2, - # initial_speed=0., - # ), + Maneuver( + "creep: alternate between +1m/ss and -1m/ss", + [ + Action(1, 2), Action(-1, 2), + Action(1, 2), Action(-1, 2), + Action(1, 2), Action(-1, 2), + ], + repeat=2, + initial_speed=0., + ), Maneuver( "brake step response: -1m/ss from 20mph", [Action(-1, 3)], @@ -156,9 +191,6 @@ def main(): maneuvers = iter(MANEUVERS) maneuver = None - ready_cnt = 0 - repeat = 0 - maneuver_active = False while True: sm.update() @@ -168,55 +200,22 @@ def main(): if maneuver is None: print('We are done!') - # print(maneuver) - - if maneuver is not None: - pass plan_send = messaging.new_message('longitudinalPlan') plan_send.valid = sm.all_checks() longitudinalPlan = plan_send.longitudinalPlan + accel = 0 + cs = sm['carState'] if maneuver is not None: - # desired_ - - v_ego = sm['carState'].vEgo - cs = sm['carState'] - - ready = abs(v_ego - maneuver.initial_speed) < 0.4 and cs.cruiseState.enabled and not cs.cruiseState.standstill - ready_cnt = (ready_cnt + 1) if ready else 0 - if ready_cnt > (2. / DT_MDL): - print('ready!!!') - maneuver_active = True - - if maneuver_active: - accel = maneuver.actions[0].accel - diff = v_ego * - v_ego - # longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)] - longitudinalPlan.speeds = [v_ego + accel for i in range(CONTROL_N)] - longitudinalPlan.accels = [accel] * CONTROL_N - longitudinalPlan.jerks = [0] * CONTROL_N - print('v_ego', sm['carState'].vEgo) - print('speeds:', longitudinalPlan.speeds) - print('accels:', longitudinalPlan.accels) - else: - accel = (maneuver.initial_speed - cs.vEgo) - diff = maneuver.initial_speed - v_ego - longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)] - longitudinalPlan.accels = [accel] * CONTROL_N - longitudinalPlan.jerks = [0] * CONTROL_N - # print('v_ego', sm['carState'].vEgo) - # print('speeds:', longitudinalPlan.speeds) - # print('accels:', longitudinalPlan.accels) - - a_target, should_stop = get_accel_from_plan(CP, longitudinalPlan.speeds, longitudinalPlan.accels) + accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill, sm.frame) + longitudinalPlan.aTarget = accel - longitudinalPlan.shouldStop = should_stop - print('aTarget:', longitudinalPlan.aTarget) + longitudinalPlan.shouldStop = cs.vEgo < CP.vEgoStopping and accel < 0 # should_stop + longitudinalPlan.allowBrake = True longitudinalPlan.allowThrottle = True - longitudinalPlan.hasLead = True pm.send('longitudinalPlan', plan_send) @@ -225,66 +224,11 @@ def main(): assistance_send.valid = True pm.send('driverAssistance', assistance_send) + print('finished?', maneuver.finished) + print('aTarget:', longitudinalPlan.aTarget) - print("\n\n") - - maneuvers = MANEUVERS - if len(args.maneuvers): - maneuvers = [MANEUVERS[i-1] for i in set(args.maneuvers)] - - logs = {} - rk = Ratekeeper(int(1./DT_CTRL)) - for i, m in enumerate(maneuvers): - logs[m.description] = {} - print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'") - for run in range(m.repeat): - print(f"- run #{run}") - print("- setting up, engage cruise") - ready_cnt = 0 - for _ in range(int(2*60./DT_CTRL)): - cs = p.read(strict=False) - cc = CarControl( - enabled=True, - longActive=True, - actuators=CarControl.Actuators( - accel=(m.initial_speed - cs.vEgo)*0.8, - longControlState=CarControl.Actuators.LongControlState.pid, - ), - ) - if m.initial_speed < 0.1: - cc.actuators.accel = -2 - cc.actuators.longControlState = CarControl.Actuators.LongControlState.stopping - p.write(cc) - - ready = cs.cruiseState.enabled and not cs.cruiseState.standstill and ((m.initial_speed - 0.6) < cs.vEgo < (m.initial_speed + 0.6)) - ready_cnt = (ready_cnt+1) if ready else 0 - if ready_cnt > (2./DT_CTRL): - break - rk.keep_time() - else: - print("ERROR: failed to setup") - continue - - print("- executing maneuver") - logs[m.description][run] = defaultdict(list) - for t, cc in m.get_msgs(): - cs = p.read() - p.write(cc) - - logs[m.description][run]["t"].append(t) - to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators, - "carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState} - for k, v in to_log.items(): - for k2, v2 in asdict(v).items(): - logs[m.description][run][f"{k}.{k2}"].append(v2) - - rk.keep_time() - - # print("writing out report") - # with open('/tmp/logs.json', 'w') as f: - # import json - # json.dump(logs, f, indent=2) - # report(args, logs, p.CI.CP.carFingerprint) + if maneuver is not None and maneuver.finished: + maneuver = None if __name__ == "__main__":