#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  system . sensord . pigeond  as  pd 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  system . hardware  import  TICI 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . manager . process_config  import  managed_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . test . helpers  import  with_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  wait_for_location ( sm ,  timeout ,  con = 10 ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cons_meas  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  start_time  =  time . monotonic ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  ( time . monotonic ( )  -  start_time )  <  timeout : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sm . update ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  sm . updated [ " gnssMeasurements " ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    msg  =  sm [ " gnssMeasurements " ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cons_meas  =  ( cons_meas  +  1 )  if  ' positionECEF '  in  msg . to_dict ( )  else  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  cons_meas  > =  con : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestLaikad ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  TICI : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ublox_available  =  Params ( ) . get_bool ( " UbloxAvailable " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  ublox_available : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # ensure laikad cold start 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    Params ( ) . remove ( " LaikadEphemerisV2 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ " LAIKAD_NO_INTERNET " ]  =  " 1 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ ' laikad ' ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tearDown ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ ' laikad ' ] . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @with_processes ( [ ' pigeond ' ,  ' ubloxd ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_laikad_cold_start ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    start_time  =  time . monotonic ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sm  =  messaging . SubMaster ( [ " gnssMeasurements " ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    success  =  wait_for_location ( sm ,  60 * 2 ,  con = 10 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    duration  =  time . monotonic ( )  -  start_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  success ,  " Waiting for location timed out (2min)! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  duration  <  60 ,  f " Received Location  { duration } ! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @with_processes ( [ ' ubloxd ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_laikad_ublox_reset_start ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pigeon ,  pm  =  pd . create_pigeon ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pd . init_baudrate ( pigeon ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  pigeon . reset_device ( ) ,  " Could not reset device! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    laikad_sock  =  messaging . sub_sock ( " gnssMeasurements " ,  timeout = 0.1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ublox_gnss_sock  =  messaging . sub_sock ( " ubloxGnss " ,  timeout = 0.1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pd . init_baudrate ( pigeon ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pd . initialize_pigeon ( pigeon ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pd . run_receiving ( pigeon ,  pm ,  180 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ublox_msgs  =  messaging . drain_sock ( ublox_gnss_sock ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    laikad_msgs  =  messaging . drain_sock ( laikad_sock ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    gps_ephem_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    glonass_ephem_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  um  in  ublox_msgs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  um . ubloxGnss . which ( )  ==  ' ephemeris ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        gps_ephem_cnt  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  um . ubloxGnss . which ( )  ==  ' glonassEphemeris ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        glonass_ephem_cnt  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  gps_ephem_cnt  >  0 ,  " NO gps ephemeris collected! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  glonass_ephem_cnt  >  0 ,  " NO glonass ephemeris collected! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pos_meas  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    duration  =  - 1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  lm  in  laikad_msgs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pos_meas  =  ( pos_meas  +  1 )  if  ' positionECEF '  in  lm . gnssMeasurements . to_dict ( )  else  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  pos_meas  >  5 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        duration  =  ( lm . logMonoTime  -  laikad_msgs [ 0 ] . logMonoTime ) * 1e-9 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  pos_meas  >  5 ,  " NOT enough positions at end of read! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  duration  <  120 ,  " Laikad took too long to get a Position! " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )