#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  sys 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  sympy  as  sp 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . locationd . models . constants  import  ObservationKind 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  rednose . helpers . ekf_sym  import  gen_code ,  EKF_sym 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  LaneKalman ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  name  =  ' lane ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @staticmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  generate_code ( generated_dir ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # make functions and jacobians with sympy 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    #  state variables 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    dim  =  6 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    state  =  sp . MatrixSymbol ( ' state ' ,  dim ,  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    dd  =  sp . Symbol ( ' dd ' )   # WARNING: NOT TIME 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Time derivative of the state as a function of state 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    state_dot  =  sp . Matrix ( np . zeros ( ( dim ,  1 ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    state_dot [ : 3 , 0 ]  =  sp . Matrix ( state [ 3 : 6 , 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Basic descretization, 1st order intergrator 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Can be pretty bad if dt is big 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    f_sym  =  sp . Matrix ( state )  +  dd * state_dot 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Observation functions 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    h_lane_sym  =  sp . Matrix ( state [ : 3 , 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    obs_eqs  =  [ [ h_lane_sym ,  ObservationKind . LANE_PT ,  None ] ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    gen_code ( generated_dir ,  LaneKalman . name ,  f_sym ,  dd ,  state ,  obs_eqs ,  dim ,  dim ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  generated_dir ,  pt_std = 5 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # state 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # left and right lane centers in ecef 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # WARNING: this is not a temporal model 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # the 'time' in this kalman filter is 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # the distance traveled by the vehicle, 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # which should approximately be the 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # distance along the lane path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # a more logical parametrization 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # states 0-2 are ecef coordinates distance d 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # states 3-5  is the 3d "velocity" of the 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # lane in ecef (m/m). 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    x_initial  =  np . array ( [ 0 , 0 , 0 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                          0 , 0 , 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # state covariance 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    P_initial  =  np . diag ( [ 1e16 ,  1e16 ,  1e16 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                         1 * * 2 ,  1 * * 2 ,  1 * * 2 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # process noise 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Q  =  np . diag ( [ 0.1 * * 2 ,  0.1 * * 2 ,  0.1 * * 2 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                 0.1 * * 2 ,  0.1 * * 2 ,  0.1 * 2 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . dim_state  =  len ( x_initial ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # init filter 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . filter  =  EKF_sym ( generated_dir ,  self . name ,  Q ,  x_initial ,  P_initial ,  x_initial . shape [ 0 ] ,  P_initial . shape [ 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . obs_noise  =  { ObservationKind . LANE_PT :  np . diag ( [ pt_std * * 2 ] * 3 ) } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  x ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . filter . state ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  P ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . filter . covs ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  predict ( self ,  t ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . filter . predict ( t ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  rts_smooth ( self ,  estimates ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . filter . rts_smooth ( estimates ,  norm_quats = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  init_state ( self ,  state ,  covs_diag = None ,  covs = None ,  filter_time = None ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  covs_diag  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      P  =  np . diag ( covs_diag ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  covs  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      P  =  covs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      P  =  self . filter . covs ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . filter . init_state ( state ,  P ,  filter_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  predict_and_observe ( self ,  t ,  kind ,  data ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    data  =  np . atleast_2d ( data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . filter . predict_and_update_batch ( t ,  kind ,  data ,  self . get_R ( kind ,  len ( data ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  get_R ( self ,  kind ,  n ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    obs_noise  =  self . obs_noise [ kind ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    dim  =  obs_noise . shape [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    R  =  np . zeros ( ( n ,  dim ,  dim ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i  in  range ( n ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      R [ i , : , : ]  =  obs_noise 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  R 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  generated_dir  =  sys . argv [ 2 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  LaneKalman . generate_code ( generated_dir )