openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

131 lines
5.6 KiB

#!/usr/bin/env python3
# type: ignore
import os
import time
import argparse
import signal
from collections import defaultdict
import cereal.messaging as messaging
def sigint_handler(signal, frame):
print("handler!")
exit(0)
signal.signal(signal.SIGINT, sigint_handler)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Sniff a communication socket')
parser.add_argument('control_type', help="[pid|indi|lqr|angle]")
parser.add_argument('--addr', default='127.0.0.1', help="IP address for optional ZMQ listener, default to msgq")
parser.add_argument('--group', default='all', help="speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all")
args = parser.parse_args()
if args.addr != "127.0.0.1":
os.environ["ZMQ"] = "1"
messaging.context = messaging.Context()
all_groups = {"germany": (45, "45 - up m/s // 162 - up km/h // 101 - up mph"),
"veryfast": (35, "35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph"),
"fast": (25, "25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph"),
"medium": (15, "15 - 25 m/s // 54 - 90 km/h // 34 - 56 mph"),
"slow": (5, " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph"),
"crawl": (0, " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph")}
if args.group == "all":
display_groups = all_groups.keys()
elif args.group in all_groups.keys():
display_groups = [args.group]
else:
raise ValueError("invalid speed group, see help")
speed_group_stats = {}
for group in all_groups:
speed_group_stats[group] = defaultdict(lambda: {'err': 0, "cnt": 0, "=": 0, "+": 0, "-": 0, "steer": 0, "limited": 0, "saturated": 0, "dpp": 0})
carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True)
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'lateralPlan'], addr=args.addr)
time.sleep(1) # Make sure all submaster data is available before going further
msg_cnt = 0
cnt = 0
total_error = 0
while messaging.recv_one(carControl):
sm.update()
msg_cnt += 1
if args.control_type == "pid":
control_state = sm['controlsState'].lateralControlState.pidState
elif args.control_type == "indi":
control_state = sm['controlsState'].lateralControlState.indiState
elif args.control_type == "lqr":
control_state = sm['controlsState'].lateralControlState.lqrState
elif args.control_type == "angle":
control_state = sm['controlsState'].lateralControlState.angleState
else:
raise ValueError("invalid lateral control type, see help")
v_ego = sm['carState'].vEgo
active = sm['controlsState'].active
steer = sm['carControl'].actuatorsOutput.steer
standstill = sm['carState'].standstill
steer_limited = abs(sm['carControl'].actuators.steer - sm['carControl'].actuatorsOutput.steer) > 1e-2
overriding = sm['carState'].steeringPressed
changing_lanes = sm['lateralPlan'].laneChangeState != 0
d_path_points = sm['lateralPlan'].dPathPoints
# must be engaged, not at standstill, not overriding steering, and not changing lanes
if active and not standstill and not overriding and not changing_lanes:
cnt += 1
# wait 5 seconds after engage / standstill / override / lane change
if cnt >= 500:
actual_angle = control_state.steeringAngleDeg
desired_angle = control_state.steeringAngleDesiredDeg
# calculate error before rounding, then round for stats grouping
angle_error = abs(desired_angle - actual_angle)
actual_angle = round(actual_angle, 1)
desired_angle = round(desired_angle, 1)
angle_error = round(angle_error, 2)
angle_abs = int(abs(round(desired_angle, 0)))
for group, group_props in all_groups.items():
if v_ego > group_props[0]:
# collect stats
speed_group_stats[group][angle_abs]["cnt"] += 1
speed_group_stats[group][angle_abs]["err"] += angle_error
speed_group_stats[group][angle_abs]["steer"] += abs(steer)
if len(d_path_points):
speed_group_stats[group][angle_abs]["dpp"] += abs(d_path_points[0])
if steer_limited:
speed_group_stats[group][angle_abs]["limited"] += 1
if control_state.saturated:
speed_group_stats[group][angle_abs]["saturated"] += 1
if actual_angle == desired_angle:
speed_group_stats[group][angle_abs]["="] += 1
else:
if desired_angle == 0.:
overshoot = True
else:
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle
speed_group_stats[group][angle_abs]["+" if overshoot else "-"] += 1
break
else:
cnt = 0
if msg_cnt % 100 == 0:
print(chr(27) + "[2J")
if cnt != 0:
print("COLLECTING ...\n")
else:
print("DISABLED (not active, standstill, steering override, or lane change)\n")
for group in display_groups:
if len(speed_group_stats[group]) > 0:
print(f"speed group: {group:10s} {all_groups[group][1]:>96s}")
print(f" {'-'*118}")
for k in sorted(speed_group_stats[group].keys()):
v = speed_group_stats[group][k]
print(f' {k:#2}° | actuator:{int(v["steer"] / v["cnt"] * 100):#3}% | error: {round(v["err"] / v["cnt"], 2):2.2f}° | -:{int(v["-"] / v["cnt"] * 100):#3}% | =:{int(v["="] / v["cnt"] * 100):#3}% | +:{int(v["+"] / v["cnt"] * 100):#3}% | lim:{v["limited"]:#5} | sat:{v["saturated"]:#5} | path dev: {round(v["dpp"] / v["cnt"], 2):2.2f}m | total: {v["cnt"]:#5}')
print("")