openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
6.5 KiB

#!/usr/bin/env python3
# type: ignore
import os
import time
import argparse
import signal
from collections import defaultdict
import cereal.messaging as messaging
from openpilot.tools.lib.logreader import LogReader
def sigint_handler(signal, frame):
exit(0)
signal.signal(signal.SIGINT, sigint_handler)
class SteeringAccuracyTool:
all_groups = {"germany": (45, "45 - up m/s // 162 - up km/h // 101 - up mph"),
"veryfast": (35, "35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph"),
"fast": (25, "25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph"),
"medium": (15, "15 - 25 m/s // 54 - 90 km/h // 34 - 56 mph"),
"slow": (5, " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph"),
"crawl": (0, " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph")}
def __init__(self, args):
self.msg_cnt = 0
self.cnt = 0
self.total_error = 0
if args.group == "all":
self.display_groups = self.all_groups.keys()
elif args.group in self.all_groups.keys():
self.display_groups = [args.group]
else:
raise ValueError("invalid speed group, see help")
self.speed_group_stats = {}
for group in self.all_groups:
self.speed_group_stats[group] = defaultdict(lambda: {'err': 0, "cnt": 0, "=": 0, "+": 0, "-": 0, "steer": 0, "limited": 0, "saturated": 0, "dpp": 0})
def update(self, sm):
self.msg_cnt += 1
lateralControlState = sm['controlsState'].lateralControlState
control_type = list(lateralControlState.to_dict().keys())[0]
control_state = lateralControlState.__getattr__(control_type)
v_ego = sm['carState'].vEgo
active = sm['controlsState'].active
steer = sm['carControl'].actuatorsOutput.steer
standstill = sm['carState'].standstill
steer_limited = abs(sm['carControl'].actuators.steer - sm['carControl'].actuatorsOutput.steer) > 1e-2
overriding = sm['carState'].steeringPressed
changing_lanes = sm['modelV2'].meta.laneChangeState != 0
model_points = sm['modelV2'].position.y
# must be engaged, not at standstill, not overriding steering, and not changing lanes
if active and not standstill and not overriding and not changing_lanes:
self.cnt += 1
# wait 5 seconds after engage / standstill / override / lane change
if self.cnt >= 500:
actual_angle = control_state.steeringAngleDeg
desired_angle = control_state.steeringAngleDesiredDeg
# calculate error before rounding, then round for stats grouping
angle_error = abs(desired_angle - actual_angle)
actual_angle = round(actual_angle, 1)
desired_angle = round(desired_angle, 1)
angle_error = round(angle_error, 2)
angle_abs = int(abs(round(desired_angle, 0)))
for group, group_props in self.all_groups.items():
if v_ego > group_props[0]:
# collect stats
self.speed_group_stats[group][angle_abs]["cnt"] += 1
self.speed_group_stats[group][angle_abs]["err"] += angle_error
self.speed_group_stats[group][angle_abs]["steer"] += abs(steer)
if len(model_points):
self.speed_group_stats[group][angle_abs]["dpp"] += abs(model_points[0])
if steer_limited:
self.speed_group_stats[group][angle_abs]["limited"] += 1
if control_state.saturated:
self.speed_group_stats[group][angle_abs]["saturated"] += 1
if actual_angle == desired_angle:
self.speed_group_stats[group][angle_abs]["="] += 1
else:
if desired_angle == 0.:
overshoot = True
else:
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle
self.speed_group_stats[group][angle_abs]["+" if overshoot else "-"] += 1
break
else:
self.cnt = 0
if self.msg_cnt % 100 == 0:
print(chr(27) + "[2J")
if self.cnt != 0:
print("COLLECTING ...\n")
else:
print("DISABLED (not active, standstill, steering override, or lane change)\n")
for group in self.display_groups:
if len(self.speed_group_stats[group]) > 0:
print(f"speed group: {group:10s} {self.all_groups[group][1]:>96s}")
print(f" {'-'*118}")
for k in sorted(self.speed_group_stats[group].keys()):
v = self.speed_group_stats[group][k]
print(f' {k:#2}° | actuator:{int(v["steer"] / v["cnt"] * 100):#3}% ' +
f'| error: {round(v["err"] / v["cnt"], 2):2.2f}° | -:{int(v["-"] / v["cnt"] * 100):#3}% ' +
f'| =:{int(v["="] / v["cnt"] * 100):#3}% | +:{int(v["+"] / v["cnt"] * 100):#3}% | lim:{v["limited"]:#5} ' +
f'| sat:{v["saturated"]:#5} | path dev: {round(v["dpp"] / v["cnt"], 2):2.2f}m | total: {v["cnt"]:#5}')
print("")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Steering accuracy measurement tool')
parser.add_argument('--route', help="route name")
parser.add_argument('--addr', default='127.0.0.1', help="IP address for optional ZMQ listener, default to msgq")
parser.add_argument('--group', default='all', help="speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all")
parser.add_argument('--cache', default=False, action='store_true', help="use cached data, default to False")
args = parser.parse_args()
if args.cache:
os.environ['FILEREADER_CACHE'] = '1'
tool = SteeringAccuracyTool(args)
if args.route is not None:
print(f"loading {args.route}...")
lr = LogReader(args.route, sort_by_time=True)
sm = {}
for msg in lr:
if msg.which() == 'carState':
sm['carState'] = msg.carState
elif msg.which() == 'carControl':
sm['carControl'] = msg.carControl
elif msg.which() == 'controlsState':
sm['controlsState'] = msg.controlsState
elif msg.which() == 'modelV2':
sm['modelV2'] = msg.modelV2
if msg.which() == 'carControl' and 'carState' in sm and 'controlsState' in sm and 'modelV2' in sm:
tool.update(sm)
else:
if args.addr != "127.0.0.1":
os.environ["ZMQ"] = "1"
messaging.context = messaging.Context()
carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True)
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'modelV2'], addr=args.addr)
time.sleep(1) # Make sure all submaster data is available before going further
print("waiting for messages...")
while messaging.recv_one(carControl):
sm.update()
tool.update(sm)