#!/usr/bin/env python3
# type: ignore
import os
import time
import argparse
import signal
from collections import defaultdict
import cereal . messaging as messaging
def sigint_handler ( signal , frame ) :
print ( " handler! " )
exit ( 0 )
signal . signal ( signal . SIGINT , sigint_handler )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = ' Sniff a communication socket ' )
parser . add_argument ( ' control_type ' , help = " [pid|indi|lqr|angle] " )
parser . add_argument ( ' --addr ' , default = ' 127.0.0.1 ' , help = " IP address for optional ZMQ listener, default to msgq " )
parser . add_argument ( ' --group ' , default = ' all ' , help = " speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all " )
args = parser . parse_args ( )
if args . addr != " 127.0.0.1 " :
os . environ [ " ZMQ " ] = " 1 "
messaging . context = messaging . Context ( )
all_groups = { " germany " : ( 45 , " 45 - up m/s // 162 - up km/h // 101 - up mph " ) ,
" veryfast " : ( 35 , " 35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph " ) ,
" fast " : ( 25 , " 25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph " ) ,
" medium " : ( 15 , " 15 - 25 m/s // 54 - 90 km/h // 34 - 56 mph " ) ,
" slow " : ( 5 , " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph " ) ,
" crawl " : ( 0 , " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph " ) }
if args . group == " all " :
display_groups = all_groups . keys ( )
elif args . group in all_groups . keys ( ) :
display_groups = [ args . group ]
else :
raise ValueError ( " invalid speed group, see help " )
speed_group_stats = { }
for group in all_groups :
speed_group_stats [ group ] = defaultdict ( lambda : { ' err ' : 0 , " cnt " : 0 , " = " : 0 , " + " : 0 , " - " : 0 , " steer " : 0 , " limited " : 0 , " saturated " : 0 , " dpp " : 0 } )
carControl = messaging . sub_sock ( ' carControl ' , addr = args . addr , conflate = True )
sm = messaging . SubMaster ( [ ' carState ' , ' carControl ' , ' controlsState ' , ' lateralPlan ' ] , addr = args . addr )
time . sleep ( 1 ) # Make sure all submaster data is available before going further
msg_cnt = 0
cnt = 0
total_error = 0
while messaging . recv_one ( carControl ) :
sm . update ( )
msg_cnt + = 1
if args . control_type == " pid " :
control_state = sm [ ' controlsState ' ] . lateralControlState . pidState
elif args . control_type == " indi " :
control_state = sm [ ' controlsState ' ] . lateralControlState . indiState
elif args . control_type == " lqr " :
control_state = sm [ ' controlsState ' ] . lateralControlState . lqrState
elif args . control_type == " angle " :
control_state = sm [ ' controlsState ' ] . lateralControlState . angleState
else :
raise ValueError ( " invalid lateral control type, see help " )
v_ego = sm [ ' carState ' ] . vEgo
active = sm [ ' controlsState ' ] . active
steer = sm [ ' carControl ' ] . actuatorsOutput . steer
standstill = sm [ ' carState ' ] . standstill
steer_limited = abs ( sm [ ' carControl ' ] . actuators . steer - sm [ ' carControl ' ] . actuatorsOutput . steer ) > 1e-2
overriding = sm [ ' carState ' ] . steeringPressed
changing_lanes = sm [ ' lateralPlan ' ] . laneChangeState != 0
d_path_points = sm [ ' lateralPlan ' ] . dPathPoints
# must be engaged, not at standstill, not overriding steering, and not changing lanes
if active and not standstill and not overriding and not changing_lanes :
cnt + = 1
# wait 5 seconds after engage / standstill / override / lane change
if cnt > = 500 :
actual_angle = control_state . steeringAngleDeg
desired_angle = control_state . steeringAngleDesiredDeg
# calculate error before rounding, then round for stats grouping
angle_error = abs ( desired_angle - actual_angle )
actual_angle = round ( actual_angle , 1 )
desired_angle = round ( desired_angle , 1 )
angle_error = round ( angle_error , 2 )
angle_abs = int ( abs ( round ( desired_angle , 0 ) ) )
for group , group_props in all_groups . items ( ) :
if v_ego > group_props [ 0 ] :
# collect stats
speed_group_stats [ group ] [ angle_abs ] [ " cnt " ] + = 1
speed_group_stats [ group ] [ angle_abs ] [ " err " ] + = angle_error
speed_group_stats [ group ] [ angle_abs ] [ " steer " ] + = abs ( steer )
if len ( d_path_points ) :
speed_group_stats [ group ] [ angle_abs ] [ " dpp " ] + = abs ( d_path_points [ 0 ] )
if steer_limited :
speed_group_stats [ group ] [ angle_abs ] [ " limited " ] + = 1
if control_state . saturated :
speed_group_stats [ group ] [ angle_abs ] [ " saturated " ] + = 1
if actual_angle == desired_angle :
speed_group_stats [ group ] [ angle_abs ] [ " = " ] + = 1
else :
if desired_angle == 0. :
overshoot = True
else :
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle
speed_group_stats [ group ] [ angle_abs ] [ " + " if overshoot else " - " ] + = 1
break
else :
cnt = 0
if msg_cnt % 100 == 0 :
print ( chr ( 27 ) + " [2J " )
if cnt != 0 :
print ( " COLLECTING ... \n " )
else :
print ( " DISABLED (not active, standstill, steering override, or lane change) \n " )
for group in display_groups :
if len ( speed_group_stats [ group ] ) > 0 :
print ( f " speed group: { group : 10s } { all_groups [ group ] [ 1 ] : >96s } " )
print ( f " { ' - ' * 118 } " )
for k in sorted ( speed_group_stats [ group ] . keys ( ) ) :
v = speed_group_stats [ group ] [ k ]
print ( f ' { k : #2 } ° | actuator: { int ( v [ " steer " ] / v [ " cnt " ] * 100 ) : #3 } % | error: { round ( v [ " err " ] / v [ " cnt " ] , 2 ) : 2.2f } ° | -: { int ( v [ " - " ] / v [ " cnt " ] * 100 ) : #3 } % | =: { int ( v [ " = " ] / v [ " cnt " ] * 100 ) : #3 } % | +: { int ( v [ " + " ] / v [ " cnt " ] * 100 ) : #3 } % | lim: { v [ " limited " ] : #5 } | sat: { v [ " saturated " ] : #5 } | path dev: { round ( v [ " dpp " ] / v [ " cnt " ] , 2 ) : 2.2f } m | total: { v [ " cnt " ] : #5 } ' )
print ( " " )