import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  typing  import  Any 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  functools  import  cache 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . transformations . orientation  import  rot_from_euler ,  euler_from_rot 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@cache 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  fft_next_good_size ( n :  int )  - >  int : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    smallest  composite  of  2 ,  3 ,  5 ,  7 ,  11  that  is  > =  n 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    inspired  by  pocketfft 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  n  < =  6 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  n 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    best ,  f2  =  2  *  n ,  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  f2  <  best : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        f23  =  f2 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        while  f23  <  best : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            f235  =  f23 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            while  f235  <  best : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                f2357  =  f235 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                while  f2357  <  best : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    f235711  =  f2357 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    while  f235711  <  best : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                        best  =  f235711  if  f235711  > =  n  else  best 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                        f235711  * =  11 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    f2357  * =  7 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                f235  * =  5 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            f23  * =  3 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        f2  * =  2 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  best 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  parabolic_peak_interp ( R ,  max_index ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  max_index  ==  0  or  max_index  ==  len ( R )  -  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  max_index 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  y_m1 ,  y_0 ,  y_p1  =  R [ max_index  -  1 ] ,  R [ max_index ] ,  R [ max_index  +  1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  offset  =  0.5  *  ( y_p1  -  y_m1 )  /  ( 2  *  y_0  -  y_p1  -  y_m1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  max_index  +  offset 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  rotate_cov ( rot_matrix ,  cov_in ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  rot_matrix  @  cov_in  @  rot_matrix . T 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  rotate_std ( rot_matrix ,  std_in ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  np . sqrt ( np . diag ( rotate_cov ( rot_matrix ,  np . diag ( std_in * * 2 ) ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  NPQueue : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  maxlen :  int ,  rowsize :  int )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . maxlen  =  maxlen 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . arr  =  np . empty ( ( 0 ,  rowsize ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __len__ ( self )  - >  int : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  len ( self . arr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  append ( self ,  pt :  list [ float ] )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( self . arr )  <  self . maxlen : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . arr  =  np . append ( self . arr ,  [ pt ] ,  axis = 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . arr [ : - 1 ]  =  self . arr [ 1 : ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . arr [ - 1 ]  =  pt 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  PointBuckets : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  x_bounds :  list [ tuple [ float ,  float ] ] ,  min_points :  list [ float ] ,  min_points_total :  int ,  points_per_bucket :  int ,  rowsize :  int )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . x_bounds  =  x_bounds 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . buckets  =  { bounds :  NPQueue ( maxlen = points_per_bucket ,  rowsize = rowsize )  for  bounds  in  x_bounds } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . buckets_min_points  =  dict ( zip ( x_bounds ,  min_points ,  strict = True ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . min_points_total  =  min_points_total 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __len__ ( self )  - >  int : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  sum ( [ len ( v )  for  v  in  self . buckets . values ( ) ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  is_valid ( self )  - >  bool : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    individual_buckets_valid  =  all ( len ( v )  > =  min_pts  for  v ,  min_pts  in  zip ( self . buckets . values ( ) ,  self . buckets_min_points . values ( ) ,  strict = True ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    total_points_valid  =  self . __len__ ( )  > =  self . min_points_total 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  individual_buckets_valid  and  total_points_valid 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  is_calculable ( self )  - >  bool : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  all ( len ( v )  >  0  for  v  in  self . buckets . values ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  add_point ( self ,  x :  float ,  y :  float )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  NotImplementedError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  get_points ( self ,  num_points :  int  =  None )  - >  Any : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    points  =  np . vstack ( [ x . arr  for  x  in  self . buckets . values ( ) ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  num_points  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  points 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  points [ np . random . choice ( np . arange ( len ( points ) ) ,  min ( len ( points ) ,  num_points ) ,  replace = False ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  load_points ( self ,  points :  list [ list [ float ] ] )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  point  in  points : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . add_point ( * point ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  ParameterEstimator : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  """  Base class for parameter estimators  """ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  reset ( self )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  NotImplementedError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  handle_log ( self ,  t :  int ,  which :  str ,  msg :  log . Event )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  NotImplementedError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  get_msg ( self ,  valid :  bool ,  with_points :  bool )  - >  log . Event : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  NotImplementedError 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  Measurement : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  x ,  y ,  z  =  ( property ( lambda  self :  self . xyz [ 0 ] ) ,  property ( lambda  self :  self . xyz [ 1 ] ) ,  property ( lambda  self :  self . xyz [ 2 ] ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  x_std ,  y_std ,  z_std  =  ( property ( lambda  self :  self . xyz_std [ 0 ] ) ,  property ( lambda  self :  self . xyz_std [ 1 ] ) ,  property ( lambda  self :  self . xyz_std [ 2 ] ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  roll ,  pitch ,  yaw  =  x ,  y ,  z 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  roll_std ,  pitch_std ,  yaw_std  =  x_std ,  y_std ,  z_std 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  xyz :  np . ndarray ,  xyz_std :  np . ndarray ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . xyz :  np . ndarray  =  xyz 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . xyz_std :  np . ndarray  =  xyz_std 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  from_measurement_xyz ( cls ,  measurement :  log . LivePose . XYZMeasurement )  - >  ' Measurement ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  cls ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      xyz = np . array ( [ measurement . x ,  measurement . y ,  measurement . z ] ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      xyz_std = np . array ( [ measurement . xStd ,  measurement . yStd ,  measurement . zStd ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  Pose : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  orientation :  Measurement ,  velocity :  Measurement ,  acceleration :  Measurement ,  angular_velocity :  Measurement ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . orientation  =  orientation 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . velocity  =  velocity 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . acceleration  =  acceleration 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . angular_velocity  =  angular_velocity 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  from_live_pose ( cls ,  live_pose :  log . LivePose )  - >  ' Pose ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  Pose ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      orientation = Measurement . from_measurement_xyz ( live_pose . orientationNED ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      velocity = Measurement . from_measurement_xyz ( live_pose . velocityDevice ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      acceleration = Measurement . from_measurement_xyz ( live_pose . accelerationDevice ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      angular_velocity = Measurement . from_measurement_xyz ( live_pose . angularVelocityDevice ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  PoseCalibrator : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . calib_valid  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . calib_from_device  =  np . eye ( 3 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _transform_calib_from_device ( self ,  meas :  Measurement ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    new_xyz  =  self . calib_from_device  @  meas . xyz 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    new_xyz_std  =  rotate_std ( self . calib_from_device ,  meas . xyz_std ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  Measurement ( new_xyz ,  new_xyz_std ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _ned_from_calib ( self ,  orientation :  Measurement ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ned_from_device  =  rot_from_euler ( orientation . xyz ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ned_from_calib  =  ned_from_device  @  self . calib_from_device . T 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ned_from_calib_euler_meas  =  Measurement ( euler_from_rot ( ned_from_calib ) ,  np . full ( 3 ,  np . nan ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  ned_from_calib_euler_meas 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  build_calibrated_pose ( self ,  pose :  Pose )  - >  Pose : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ned_from_calib_euler  =  self . _ned_from_calib ( pose . orientation ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    angular_velocity_calib  =  self . _transform_calib_from_device ( pose . angular_velocity ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    acceleration_calib  =  self . _transform_calib_from_device ( pose . acceleration ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    velocity_calib  =  self . _transform_calib_from_device ( pose . angular_velocity ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  Pose ( ned_from_calib_euler ,  velocity_calib ,  acceleration_calib ,  angular_velocity_calib ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  feed_live_calib ( self ,  live_calib :  log . LiveCalibrationData ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    calib_rpy  =  np . array ( live_calib . rpyCalib ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    device_from_calib  =  rot_from_euler ( calib_rpy ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . calib_from_device  =  device_from_calib . T 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . calib_valid  =  live_calib . calStatus  ==  log . LiveCalibrationData . Status . calibrated