#!/usr/bin/env python3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  argparse 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  numpy  as  np 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  matplotlib . pyplot  as  plt 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  typing  import  NamedTuple 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  openpilot . tools . lib . logreader  import  LogReader 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  openpilot . selfdrive . locationd . models . pose_kf  import  EARTH_G 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								RLOG_MIN_LAT_ACTIVE  =  50 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								RLOG_MIN_STEERING_UNPRESSED  =  50 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								RLOG_MIN_REQUESTING_MAX  =  25   # sample many times after reaching max torque 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								QLOG_DECIMATION  =  10 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								class  Event ( NamedTuple ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  lateral_accel :  float 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  speed :  float 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  roll :  float 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  timestamp :  float   # relative to start of route (s) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  find_events ( lr :  LogReader ,  qlog :  bool  =  False )  - >  list [ Event ] : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  min_lat_active  =  RLOG_MIN_LAT_ACTIVE  / /  QLOG_DECIMATION  if  qlog  else  RLOG_MIN_LAT_ACTIVE 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  min_steering_unpressed  =  RLOG_MIN_STEERING_UNPRESSED  / /  QLOG_DECIMATION  if  qlog  else  RLOG_MIN_STEERING_UNPRESSED 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  min_requesting_max  =  RLOG_MIN_REQUESTING_MAX  / /  QLOG_DECIMATION  if  qlog  else  RLOG_MIN_REQUESTING_MAX 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  events  =  [ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  start_ts  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # state tracking 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  steering_unpressed  =  0   # frames 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  requesting_max  =  0   # frames 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  lat_active  =  0   # frames 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # current state 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  curvature  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  v_ego  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  roll  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  msg  in  lr : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  msg . which ( )  ==  ' carControl ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  start_ts  ==  0 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        start_ts  =  msg . logMonoTime 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      lat_active  =  lat_active  +  1  if  msg . carControl . latActive  else  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  msg . which ( )  ==  ' carOutput ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # if we test with driver torque safety, max torque can be slightly noisy 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      requesting_max  =  requesting_max  +  1  if  abs ( msg . carOutput . actuatorsOutput . torque )  >  0.95  else  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  msg . which ( )  ==  ' carState ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      steering_unpressed  =  steering_unpressed  +  1  if  not  msg . carState . steeringPressed  else  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      v_ego  =  msg . carState . vEgo 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  msg . which ( )  ==  ' controlsState ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      curvature  =  msg . controlsState . curvature 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  msg . which ( )  ==  ' liveParameters ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      roll  =  msg . liveParameters . roll 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  lat_active  >  min_lat_active  and  steering_unpressed  >  min_steering_unpressed  and  requesting_max  >  min_requesting_max : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # TODO: record max lat accel at the end of the event, need to use the past lat accel as overriding can happen before we detect it 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      requesting_max  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      current_lateral_accel  =  curvature  *  v_ego  * *  2  -  roll  *  EARTH_G 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      events . append ( Event ( current_lateral_accel ,  v_ego ,  roll ,  round ( ( msg . logMonoTime  -  start_ts )  *  1e-9 ,  2 ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      print ( events [ - 1 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  events 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								if  __name__  ==  ' __main__ ' : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser  =  argparse . ArgumentParser ( description = " Find max lateral acceleration events " , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								                                   formatter_class = argparse . ArgumentDefaultsHelpFormatter ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " route " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  args  =  parser . parse_args ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  lr  =  LogReader ( args . route ,  sort_by_time = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  qlog  =  args . route . endswith ( ' /q ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  qlog : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( ' WARNING: Treating route as qlog! ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print ( ' Finding events... ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  events  =  find_events ( lr ,  qlog = qlog ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print ( f ' Found  { len ( events ) }  events ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  perc_left_accel  =  - np . percentile ( [ - ev . lateral_accel  for  ev  in  events  if  ev . lateral_accel  <  0 ] ,  90 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  perc_right_accel  =  np . percentile ( [ ev . lateral_accel  for  ev  in  events  if  ev . lateral_accel  >  0 ] ,  90 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  CP  =  lr . first ( ' carParams ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . ion ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . clf ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . suptitle ( f ' { CP . carFingerprint }  - Max lateral acceleration events ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . title ( args . route ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . scatter ( [ ev . speed  for  ev  in  events ] ,  [ ev . lateral_accel  for  ev  in  events ] ,  label = ' max lateral accel events ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . plot ( [ 0 ,  35 ] ,  [ 3 ,  3 ] ,  c = ' r ' ,  label = ' ISO 11270 - 3 m/s^2 ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . plot ( [ 0 ,  35 ] ,  [ - 3 ,  - 3 ] ,  c = ' r ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . plot ( [ 0 ,  35 ] ,  [ perc_left_accel ,  perc_left_accel ] ,  c = ' g ' ,  linestyle = ' -- ' ,  label = ' 90th percentile left lateral accel ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . plot ( [ 0 ,  35 ] ,  [ perc_right_accel ,  perc_right_accel ] ,  c = ' #ff7f0e ' ,  linestyle = ' -- ' ,  label = ' 90th percentile right lateral accel ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . text ( 0.4 ,  float ( perc_left_accel  +  0.4 ) ,  f ' { perc_left_accel : .2f }  m/s^2 ' ,  verticalalignment = ' center ' ,  fontsize = 12 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . text ( 0.4 ,  float ( perc_right_accel  -  0.4 ) ,  f ' { perc_right_accel : .2f }  m/s^2 ' ,  verticalalignment = ' center ' ,  fontsize = 12 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . xlim ( 0 ,  35 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . ylim ( - 5 ,  5 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . xlabel ( ' speed (m/s) ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . ylabel ( ' lateral acceleration (m/s^2) ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . legend ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  plt . show ( block = True )