#!/usr/bin/env python3
# type: ignore
import os
import time
import argparse
import signal
from collections import defaultdict
import cereal . messaging as messaging
from tools . lib . logreader import logreader_from_route_or_segment
def sigint_handler ( signal , frame ) :
exit ( 0 )
signal . signal ( signal . SIGINT , sigint_handler )
class SteeringAccuracyTool :
all_groups = { " germany " : ( 45 , " 45 - up m/s // 162 - up km/h // 101 - up mph " ) ,
" veryfast " : ( 35 , " 35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph " ) ,
" fast " : ( 25 , " 25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph " ) ,
" medium " : ( 15 , " 15 - 25 m/s // 54 - 90 km/h // 34 - 56 mph " ) ,
" slow " : ( 5 , " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph " ) ,
" crawl " : ( 0 , " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph " ) }
def __init__ ( self , args ) :
self . msg_cnt = 0
self . cnt = 0
self . total_error = 0
if args . group == " all " :
self . display_groups = self . all_groups . keys ( )
elif args . group in self . all_groups . keys ( ) :
self . display_groups = [ args . group ]
else :
raise ValueError ( " invalid speed group, see help " )
self . speed_group_stats = { }
for group in self . all_groups :
self . speed_group_stats [ group ] = defaultdict ( lambda : { ' err ' : 0 , " cnt " : 0 , " = " : 0 , " + " : 0 , " - " : 0 , " steer " : 0 , " limited " : 0 , " saturated " : 0 , " dpp " : 0 } )
def update ( self , sm ) :
self . msg_cnt + = 1
lateralControlState = sm [ ' controlsState ' ] . lateralControlState
control_type = list ( lateralControlState . to_dict ( ) . keys ( ) ) [ 0 ]
control_state = lateralControlState . __getattr__ ( control_type )
v_ego = sm [ ' carState ' ] . vEgo
active = sm [ ' controlsState ' ] . active
steer = sm [ ' carControl ' ] . actuatorsOutput . steer
standstill = sm [ ' carState ' ] . standstill
steer_limited = abs ( sm [ ' carControl ' ] . actuators . steer - sm [ ' carControl ' ] . actuatorsOutput . steer ) > 1e-2
overriding = sm [ ' carState ' ] . steeringPressed
changing_lanes = sm [ ' lateralPlan ' ] . laneChangeState != 0
d_path_points = sm [ ' lateralPlan ' ] . dPathPoints
# must be engaged, not at standstill, not overriding steering, and not changing lanes
if active and not standstill and not overriding and not changing_lanes :
self . cnt + = 1
# wait 5 seconds after engage / standstill / override / lane change
if self . cnt > = 500 :
actual_angle = control_state . steeringAngleDeg
desired_angle = control_state . steeringAngleDesiredDeg
# calculate error before rounding, then round for stats grouping
angle_error = abs ( desired_angle - actual_angle )
actual_angle = round ( actual_angle , 1 )
desired_angle = round ( desired_angle , 1 )
angle_error = round ( angle_error , 2 )
angle_abs = int ( abs ( round ( desired_angle , 0 ) ) )
for group , group_props in self . all_groups . items ( ) :
if v_ego > group_props [ 0 ] :
# collect stats
self . speed_group_stats [ group ] [ angle_abs ] [ " cnt " ] + = 1
self . speed_group_stats [ group ] [ angle_abs ] [ " err " ] + = angle_error
self . speed_group_stats [ group ] [ angle_abs ] [ " steer " ] + = abs ( steer )
if len ( d_path_points ) :
self . speed_group_stats [ group ] [ angle_abs ] [ " dpp " ] + = abs ( d_path_points [ 0 ] )
if steer_limited :
self . speed_group_stats [ group ] [ angle_abs ] [ " limited " ] + = 1
if control_state . saturated :
self . speed_group_stats [ group ] [ angle_abs ] [ " saturated " ] + = 1
if actual_angle == desired_angle :
self . speed_group_stats [ group ] [ angle_abs ] [ " = " ] + = 1
else :
if desired_angle == 0. :
overshoot = True
else :
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle
self . speed_group_stats [ group ] [ angle_abs ] [ " + " if overshoot else " - " ] + = 1
break
else :
self . cnt = 0
if self . msg_cnt % 100 == 0 :
print ( chr ( 27 ) + " [2J " )
if self . cnt != 0 :
print ( " COLLECTING ... \n " )
else :
print ( " DISABLED (not active, standstill, steering override, or lane change) \n " )
for group in self . display_groups :
if len ( self . speed_group_stats [ group ] ) > 0 :
print ( f " speed group: { group : 10s } { self . all_groups [ group ] [ 1 ] : >96s } " )
print ( f " { ' - ' * 118 } " )
for k in sorted ( self . speed_group_stats [ group ] . keys ( ) ) :
v = self . speed_group_stats [ group ] [ k ]
print ( f ' { k : #2 } ° | actuator: { int ( v [ " steer " ] / v [ " cnt " ] * 100 ) : #3 } % | error: { round ( v [ " err " ] / v [ " cnt " ] , 2 ) : 2.2f } ° | -: { int ( v [ " - " ] / v [ " cnt " ] * 100 ) : #3 } % | =: { int ( v [ " = " ] / v [ " cnt " ] * 100 ) : #3 } % | +: { int ( v [ " + " ] / v [ " cnt " ] * 100 ) : #3 } % | lim: { v [ " limited " ] : #5 } | sat: { v [ " saturated " ] : #5 } | path dev: { round ( v [ " dpp " ] / v [ " cnt " ] , 2 ) : 2.2f } m | total: { v [ " cnt " ] : #5 } ' )
print ( " " )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = ' Steering accuracy measurement tool ' )
parser . add_argument ( ' --route ' , help = " route name " )
parser . add_argument ( ' --addr ' , default = ' 127.0.0.1 ' , help = " IP address for optional ZMQ listener, default to msgq " )
parser . add_argument ( ' --group ' , default = ' all ' , help = " speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all " )
parser . add_argument ( ' --cache ' , default = False , action = ' store_true ' , help = " use cached data, default to False " )
args = parser . parse_args ( )
if args . cache :
os . environ [ ' FILEREADER_CACHE ' ] = ' 1 '
tool = SteeringAccuracyTool ( args )
if args . route is not None :
print ( f " loading { args . route } ... " )
lr = logreader_from_route_or_segment ( args . route , sort_by_time = True )
sm = { }
for msg in lr :
if msg . which ( ) == ' carState ' :
sm [ ' carState ' ] = msg . carState
elif msg . which ( ) == ' carControl ' :
sm [ ' carControl ' ] = msg . carControl
elif msg . which ( ) == ' controlsState ' :
sm [ ' controlsState ' ] = msg . controlsState
elif msg . which ( ) == ' lateralPlan ' :
sm [ ' lateralPlan ' ] = msg . lateralPlan
if msg . which ( ) == ' carControl ' and ' carState ' in sm and ' controlsState ' in sm and ' lateralPlan ' in sm :
tool . update ( sm )
else :
if args . addr != " 127.0.0.1 " :
os . environ [ " ZMQ " ] = " 1 "
messaging . context = messaging . Context ( )
carControl = messaging . sub_sock ( ' carControl ' , addr = args . addr , conflate = True )
sm = messaging . SubMaster ( [ ' carState ' , ' carControl ' , ' controlsState ' , ' lateralPlan ' ] , addr = args . addr )
time . sleep ( 1 ) # Make sure all submaster data is available before going further
print ( " waiting for messages... " )
while messaging . recv_one ( carControl ) :
sm . update ( )
tool . update ( sm )