|
|
|
@ -21,14 +21,15 @@ class Event(NamedTuple): |
|
|
|
|
timestamp: float # relative to start of route (s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_events(lr: LogReader, qlog: bool = False) -> list[Event]: |
|
|
|
|
def find_events(lr: LogReader, extrapolate: bool = False, qlog: bool = False) -> list[Event]: |
|
|
|
|
min_lat_active = RLOG_MIN_LAT_ACTIVE // QLOG_DECIMATION if qlog else RLOG_MIN_LAT_ACTIVE |
|
|
|
|
min_steering_unpressed = RLOG_MIN_STEERING_UNPRESSED // QLOG_DECIMATION if qlog else RLOG_MIN_STEERING_UNPRESSED |
|
|
|
|
min_requesting_max = RLOG_MIN_REQUESTING_MAX // QLOG_DECIMATION if qlog else RLOG_MIN_REQUESTING_MAX |
|
|
|
|
|
|
|
|
|
events = [] |
|
|
|
|
# if we test with driver torque safety, max torque can be slightly noisy |
|
|
|
|
steer_threshold = 0.7 if extrapolate else 0.95 |
|
|
|
|
|
|
|
|
|
start_ts = 0 |
|
|
|
|
events = [] |
|
|
|
|
|
|
|
|
|
# state tracking |
|
|
|
|
steering_unpressed = 0 # frames |
|
|
|
@ -39,7 +40,9 @@ def find_events(lr: LogReader, qlog: bool = False) -> list[Event]: |
|
|
|
|
curvature = 0 |
|
|
|
|
v_ego = 0 |
|
|
|
|
roll = 0 |
|
|
|
|
out_torque = 0 |
|
|
|
|
|
|
|
|
|
start_ts = 0 |
|
|
|
|
for msg in lr: |
|
|
|
|
if msg.which() == 'carControl': |
|
|
|
|
if start_ts == 0: |
|
|
|
@ -48,8 +51,8 @@ def find_events(lr: LogReader, qlog: bool = False) -> list[Event]: |
|
|
|
|
lat_active = lat_active + 1 if msg.carControl.latActive else 0 |
|
|
|
|
|
|
|
|
|
elif msg.which() == 'carOutput': |
|
|
|
|
# if we test with driver torque safety, max torque can be slightly noisy |
|
|
|
|
requesting_max = requesting_max + 1 if abs(msg.carOutput.actuatorsOutput.torque) > 0.95 else 0 |
|
|
|
|
out_torque = msg.carOutput.actuatorsOutput.torque |
|
|
|
|
requesting_max = requesting_max + 1 if abs(out_torque) > steer_threshold else 0 |
|
|
|
|
|
|
|
|
|
elif msg.which() == 'carState': |
|
|
|
|
steering_unpressed = steering_unpressed + 1 if not msg.carState.steeringPressed else 0 |
|
|
|
@ -65,7 +68,8 @@ def find_events(lr: LogReader, qlog: bool = False) -> list[Event]: |
|
|
|
|
# TODO: record max lat accel at the end of the event, need to use the past lat accel as overriding can happen before we detect it |
|
|
|
|
requesting_max = 0 |
|
|
|
|
|
|
|
|
|
current_lateral_accel = curvature * v_ego ** 2 - roll * EARTH_G |
|
|
|
|
factor = 1 / abs(out_torque) |
|
|
|
|
current_lateral_accel = (curvature * v_ego ** 2 * factor) - roll * EARTH_G |
|
|
|
|
events.append(Event(current_lateral_accel, v_ego, roll, round((msg.logMonoTime - start_ts) * 1e-9, 2))) |
|
|
|
|
print(events[-1]) |
|
|
|
|
|
|
|
|
@ -77,6 +81,8 @@ if __name__ == '__main__': |
|
|
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
|
|
|
|
|
|
|
|
|
parser.add_argument("route", nargs='+') |
|
|
|
|
parser.add_argument("-e", "--extrapolate", action="store_true", help="Extrapolates max lateral acceleration events linearly. " + |
|
|
|
|
"This option can be far less accurate.") |
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
events = [] |
|
|
|
@ -92,7 +98,7 @@ if __name__ == '__main__': |
|
|
|
|
print('WARNING: Treating route as qlog!') |
|
|
|
|
|
|
|
|
|
print('Finding events...') |
|
|
|
|
events += find_events(lr, qlog=qlog) |
|
|
|
|
events += find_events(lr, extrapolate=args.extrapolate, qlog=qlog) |
|
|
|
|
|
|
|
|
|
print() |
|
|
|
|
print(f'Found {len(events)} events') |
|
|
|
|