pull/33527/head
Shane Smiskol 10 months ago
parent 388f1c964e
commit d74e708dd6
  1. 176
      examples/longitudinal_profiles.py

@ -47,27 +47,62 @@ class Action:
class Maneuver: class Maneuver:
description: str description: str
actions: list[Action] actions: list[Action]
# TODO: implement repeat
repeat: int = 1 repeat: int = 1
initial_speed: float = 0. # m/s initial_speed: float = 0. # m/s
def get_msgs(self): _active: bool = False
t0 = 0 _finished: bool = False
for action in self.actions: _action_index: int = 0
for lt, msg in action.get_msgs(): _start_frame: int = 0
yield lt + t0, msg _ready_cnt: int = 0
t0 += lt _active_frames: int = 0
def get_accel(self, v_ego: float, enabled: bool, standstill: bool, frame: int) -> float:
ready = abs(v_ego - self.initial_speed) < 0.4 and enabled and not standstill
self._ready_cnt = (self._ready_cnt + 1) if ready else 0
if self._ready_cnt > (2. / DT_MDL):
self._active = True
if not self._active:
return self.initial_speed - v_ego
action = self.actions[self._action_index]
self._active_frames += 1
if self._active_frames > (action.duration / DT_MDL):
self._action_index += 1
self._active_frames = 0
if self._action_index == len(self.actions):
self._finished = True
return 0.
return action.accel
@property
def finished(self):
return self._finished
# def get_msgs(self):
# t0 = 0
# for action in self.actions:
# for lt, msg in action.get_msgs():
# yield lt + t0, msg
# t0 += lt
MANEUVERS = [ MANEUVERS = [
# Maneuver( Maneuver(
# "creep: alternate between +1m/ss and -1m/ss", "creep: alternate between +1m/ss and -1m/ss",
# [ [
# Action(1, 2), Action(-1, 2), Action(1, 2), Action(-1, 2),
# Action(1, 2), Action(-1, 2), Action(1, 2), Action(-1, 2),
# Action(1, 2), Action(-1, 2), Action(1, 2), Action(-1, 2),
# ], ],
# repeat=2, repeat=2,
# initial_speed=0., initial_speed=0.,
# ), ),
Maneuver( Maneuver(
"brake step response: -1m/ss from 20mph", "brake step response: -1m/ss from 20mph",
[Action(-1, 3)], [Action(-1, 3)],
@ -156,9 +191,6 @@ def main():
maneuvers = iter(MANEUVERS) maneuvers = iter(MANEUVERS)
maneuver = None maneuver = None
ready_cnt = 0
repeat = 0
maneuver_active = False
while True: while True:
sm.update() sm.update()
@ -168,55 +200,22 @@ def main():
if maneuver is None: if maneuver is None:
print('We are done!') print('We are done!')
# print(maneuver)
if maneuver is not None:
pass
plan_send = messaging.new_message('longitudinalPlan') plan_send = messaging.new_message('longitudinalPlan')
plan_send.valid = sm.all_checks() plan_send.valid = sm.all_checks()
longitudinalPlan = plan_send.longitudinalPlan longitudinalPlan = plan_send.longitudinalPlan
accel = 0
cs = sm['carState']
if maneuver is not None: if maneuver is not None:
# desired_ accel = maneuver.get_accel(cs.vEgo, cs.cruiseState.enabled, cs.cruiseState.standstill, sm.frame)
v_ego = sm['carState'].vEgo
cs = sm['carState']
ready = abs(v_ego - maneuver.initial_speed) < 0.4 and cs.cruiseState.enabled and not cs.cruiseState.standstill
ready_cnt = (ready_cnt + 1) if ready else 0
if ready_cnt > (2. / DT_MDL):
print('ready!!!')
maneuver_active = True
if maneuver_active:
accel = maneuver.actions[0].accel
diff = v_ego * - v_ego
# longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)]
longitudinalPlan.speeds = [v_ego + accel for i in range(CONTROL_N)]
longitudinalPlan.accels = [accel] * CONTROL_N
longitudinalPlan.jerks = [0] * CONTROL_N
print('v_ego', sm['carState'].vEgo)
print('speeds:', longitudinalPlan.speeds)
print('accels:', longitudinalPlan.accels)
else:
accel = (maneuver.initial_speed - cs.vEgo)
diff = maneuver.initial_speed - v_ego
longitudinalPlan.speeds = [(v_ego + min(i / (CONTROL_N - 1), 1) * diff) for i in range(CONTROL_N)]
longitudinalPlan.accels = [accel] * CONTROL_N
longitudinalPlan.jerks = [0] * CONTROL_N
# print('v_ego', sm['carState'].vEgo)
# print('speeds:', longitudinalPlan.speeds)
# print('accels:', longitudinalPlan.accels)
a_target, should_stop = get_accel_from_plan(CP, longitudinalPlan.speeds, longitudinalPlan.accels)
longitudinalPlan.aTarget = accel longitudinalPlan.aTarget = accel
longitudinalPlan.shouldStop = should_stop longitudinalPlan.shouldStop = cs.vEgo < CP.vEgoStopping and accel < 0 # should_stop
print('aTarget:', longitudinalPlan.aTarget)
longitudinalPlan.allowBrake = True longitudinalPlan.allowBrake = True
longitudinalPlan.allowThrottle = True longitudinalPlan.allowThrottle = True
longitudinalPlan.hasLead = True longitudinalPlan.hasLead = True
pm.send('longitudinalPlan', plan_send) pm.send('longitudinalPlan', plan_send)
@ -225,66 +224,11 @@ def main():
assistance_send.valid = True assistance_send.valid = True
pm.send('driverAssistance', assistance_send) pm.send('driverAssistance', assistance_send)
print('finished?', maneuver.finished)
print('aTarget:', longitudinalPlan.aTarget)
print("\n\n") if maneuver is not None and maneuver.finished:
maneuver = None
maneuvers = MANEUVERS
if len(args.maneuvers):
maneuvers = [MANEUVERS[i-1] for i in set(args.maneuvers)]
logs = {}
rk = Ratekeeper(int(1./DT_CTRL))
for i, m in enumerate(maneuvers):
logs[m.description] = {}
print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
for run in range(m.repeat):
print(f"- run #{run}")
print("- setting up, engage cruise")
ready_cnt = 0
for _ in range(int(2*60./DT_CTRL)):
cs = p.read(strict=False)
cc = CarControl(
enabled=True,
longActive=True,
actuators=CarControl.Actuators(
accel=(m.initial_speed - cs.vEgo)*0.8,
longControlState=CarControl.Actuators.LongControlState.pid,
),
)
if m.initial_speed < 0.1:
cc.actuators.accel = -2
cc.actuators.longControlState = CarControl.Actuators.LongControlState.stopping
p.write(cc)
ready = cs.cruiseState.enabled and not cs.cruiseState.standstill and ((m.initial_speed - 0.6) < cs.vEgo < (m.initial_speed + 0.6))
ready_cnt = (ready_cnt+1) if ready else 0
if ready_cnt > (2./DT_CTRL):
break
rk.keep_time()
else:
print("ERROR: failed to setup")
continue
print("- executing maneuver")
logs[m.description][run] = defaultdict(list)
for t, cc in m.get_msgs():
cs = p.read()
p.write(cc)
logs[m.description][run]["t"].append(t)
to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
"carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
for k, v in to_log.items():
for k2, v2 in asdict(v).items():
logs[m.description][run][f"{k}.{k2}"].append(v2)
rk.keep_time()
# print("writing out report")
# with open('/tmp/logs.json', 'w') as f:
# import json
# json.dump(logs, f, indent=2)
# report(args, logs, p.CI.CP.carFingerprint)
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save