#!/usr/bin/env python3 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								import  time 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  unittest 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  collections  import  defaultdict 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  datetime  import  datetime 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  unittest  import  mock 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  unittest . mock  import  patch 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  common . params  import  Params 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  laika . constants  import  SECS_IN_DAY 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  laika . downloader  import  DownloadFailed 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  laika . ephemeris  import  EphemerisType ,  GPSEphemeris 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  laika . gps_time  import  GPSTime 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  laika . helpers  import  ConstellationId ,  TimeRangeHolder 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  laika . raw_gnss  import  GNSSMeasurement ,  read_raw_ublox 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  selfdrive . locationd . laikad  import  EPHEMERIS_CACHE ,  EphemerisSourceType ,  Laikad ,  create_measurement_msg 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								from  selfdrive . test . openpilotci  import  get_url 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . logreader  import  LogReader 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  get_log ( segs = range ( 0 ) ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  logs  =  [ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  i  in  segs : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    logs . extend ( LogReader ( get_url ( " 4cf7a6ad03080c90|2021-09-29--13-46-36 " ,  i ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  all_logs  =  [ m  for  m  in  logs  if  m . which ( )  ==  ' ubloxGnss ' ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  low_gnss  =  [ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  m  in  all_logs : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  m . ubloxGnss . which ( )  !=  ' measurementReport ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    MAX_MEAS  =  7 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  m . ubloxGnss . measurementReport . numMeas  >  MAX_MEAS : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      mb  =  m . as_builder ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      mb . ubloxGnss . measurementReport . numMeas  =  MAX_MEAS 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      mb . ubloxGnss . measurementReport . measurements  =  list ( m . ubloxGnss . measurementReport . measurements ) [ : MAX_MEAS ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      mb . ubloxGnss . measurementReport . measurements [ 0 ] . pseudorange  + =  1000 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      low_gnss . append ( mb . as_reader ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      low_gnss . append ( m ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  all_logs ,  low_gnss 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								def  verify_messages ( lr ,  laikad ,  return_one_success = False ) : 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  good_msgs  =  [ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  m  in  lr : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    msg  =  laikad . process_gnss_msg ( m . ubloxGnss ,  m . logMonoTime ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    if  msg  is  not  None  and  len ( msg . gnssMeasurements . correctedMeasurements )  >  0 : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      good_msgs . append ( msg ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      if  return_one_success : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return  msg 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  return  good_msgs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								def  get_first_gps_time ( logs ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  m  in  logs : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  m . ubloxGnss . which  ==  ' measurementReport ' : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      new_meas  =  read_raw_ublox ( m . ubloxGnss . measurementReport ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  len ( new_meas )  >  0 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return  new_meas [ 0 ] . recv_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  get_measurement_mock ( gpstime ,  sat_ephemeris ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  meas  =  GNSSMeasurement ( ConstellationId . GPS ,  1 ,  gpstime . week ,  gpstime . tow ,  { ' C1C ' :  0. ,  ' D1C ' :  0. } ,  { ' C1C ' :  0. ,  ' D1C ' :  0. } ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # Fake measurement being processed 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  meas . observables_final  =  meas . observables 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  meas . sat_ephemeris  =  sat_ephemeris 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  meas 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC  =  GPSTime . from_datetime ( datetime ( 2022 ,  month = 1 ,  day = 29 ,  hour = 12 ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								class  TestLaikad ( unittest . TestCase ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  @classmethod 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  setUpClass ( cls ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    logs ,  low_gnss  =  get_log ( range ( 1 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    cls . logs  =  logs 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    cls . low_gnss  =  low_gnss 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    first_gps_time  =  get_first_gps_time ( logs ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    cls . first_gps_time  =  first_gps_time 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  setUp ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    Params ( ) . remove ( EPHEMERIS_CACHE ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_fetch_navs_non_blocking ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    gpstime  =  GPSTime . from_datetime ( datetime ( 2021 ,  month = 3 ,  day = 1 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . orbit_fetch_future . result ( 30 ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Get results and save orbits to laikad: 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    ephem  =  laikad . astro_dog . navs [ ' G01 ' ] [ 0 ] 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertIsNotNone ( ephem ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime + 2 * SECS_IN_DAY ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . orbit_fetch_future . result ( 30 ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Get results and save orbits to laikad: 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime  +  2  *  SECS_IN_DAY ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    ephem2  =  laikad . astro_dog . navs [ ' G01 ' ] [ 0 ] 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertIsNotNone ( ephem ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertNotEqual ( ephem ,  ephem2 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_fetch_navs_with_wrong_clocks ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    def  check_has_navs ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . assertGreater ( len ( laikad . astro_dog . navs ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      ephem  =  laikad . astro_dog . navs [ ' G01 ' ] [ 0 ] 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      self . assertIsNotNone ( ephem ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    real_current_time  =  GPSTime . from_datetime ( datetime ( 2021 ,  month = 3 ,  day = 1 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    wrong_future_clock_time  =  real_current_time  +  SECS_IN_DAY 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( wrong_future_clock_time ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    check_has_navs ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( laikad . last_fetch_navs_t ,  wrong_future_clock_time ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Test fetching orbits with earlier time 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    assert  real_current_time  <  laikad . last_fetch_navs_t 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad . astro_dog . orbits  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( real_current_time ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    check_has_navs ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( laikad . last_fetch_navs_t ,  real_current_time ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_ephemeris_source_in_msg ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_mock  =  defaultdict ( str ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_mock [ ' sv_id ' ]  =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    gpstime  =  GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    meas  =  get_measurement_mock ( gpstime ,  laikad . astro_dog . navs [ ' R01 ' ] [ 0 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    msg  =  create_measurement_msg ( meas ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( msg . ephemerisSource . type . raw ,  EphemerisSourceType . nav ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Verify gps satellite returns same source 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    meas  =  get_measurement_mock ( gpstime ,  laikad . astro_dog . navs [ ' R01 ' ] [ 0 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    msg  =  create_measurement_msg ( meas ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( msg . ephemerisSource . type . raw ,  EphemerisSourceType . nav ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Test nasa source by using older date 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    gpstime  =  GPSTime . from_datetime ( datetime ( 2021 ,  month = 3 ,  day = 1 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( gpstime ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    meas  =  get_measurement_mock ( gpstime ,  laikad . astro_dog . navs [ ' G01 ' ] [ 0 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    msg  =  create_measurement_msg ( meas ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( msg . ephemerisSource . type . raw ,  EphemerisSourceType . nav ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Test nav source type 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    ephem  =  GPSEphemeris ( data_mock ,  gpstime ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    meas  =  get_measurement_mock ( gpstime ,  ephem ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    msg  =  create_measurement_msg ( meas ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( msg . ephemerisSource . type . raw ,  EphemerisSourceType . nav ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_laika_online ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = True ,  valid_ephem_types = EphemerisType . ULTRA_RAPID_ORBIT ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    correct_msgs  =  verify_messages ( self . logs ,  laikad ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    correct_msgs_expected  =  554 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( correct_msgs_expected ,  len ( correct_msgs ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( correct_msgs_expected ,  len ( [ m  for  m  in  correct_msgs  if  m . gnssMeasurements . positionECEF . valid ] ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_kf_becomes_valid ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    m  =  self . logs [ 0 ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertFalse ( all ( laikad . kf_valid ( m . logMonoTime  *  1e-9 ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    kf_valid  =  False 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    for  m  in  self . logs : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      laikad . process_gnss_msg ( m . ubloxGnss ,  m . logMonoTime ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      kf_valid  =  all ( laikad . kf_valid ( m . logMonoTime  *  1e-9 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  kf_valid : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        break 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertTrue ( kf_valid ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_laika_online_nav_only ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = True ,  valid_ephem_types = EphemerisType . NAV ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Disable fetch_orbits to test NAV only 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    correct_msgs  =  verify_messages ( self . logs ,  laikad ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    correct_msgs_expected  =  554 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( correct_msgs_expected ,  len ( correct_msgs ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( correct_msgs_expected ,  len ( [ m  for  m  in  correct_msgs  if  m . gnssMeasurements . positionECEF . valid ] ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  @mock . patch ( ' laika.downloader.download_and_cache_file ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  test_laika_offline ( self ,  downloader_mock ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    downloader_mock . side_effect  =  DownloadFailed ( " Mock download failed " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  @mock . patch ( ' laika.downloader.download_and_cache_file ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  test_download_failed_russian_source ( self ,  downloader_mock ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    downloader_mock . side_effect  =  DownloadFailed 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    correct_msgs  =  verify_messages ( self . logs ,  laikad ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( 0 ,  len ( correct_msgs ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( 0 ,  len ( [ m  for  m  in  correct_msgs  if  m . gnssMeasurements . positionECEF . valid ] ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_laika_get_orbits ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Pretend process has loaded the orbits on startup by using the time of the first gps message. 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( self . first_gps_time ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . dict_has_values ( laikad . astro_dog . navs ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  @unittest . skip ( " Use to debug live data " ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_laika_get_navs_now ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( GPSTime . from_datetime ( datetime . utcnow ( ) ) ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    prn  =  " G01 " 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertGreater ( len ( laikad . astro_dog . navs [ prn ] ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    prn  =  " R01 " 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertGreater ( len ( laikad . astro_dog . navs [ prn ] ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( min ( laikad . astro_dog . navs [ prn ] ,  key = lambda  e :  e . epoch ) . epoch . as_datetime ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_get_navs_in_process ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    has_navs  =  False 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    for  m  in  self . logs : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      laikad . process_gnss_msg ( m . ubloxGnss ,  m . logMonoTime ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      if  laikad . orbit_fetch_future  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        laikad . orbit_fetch_future . result ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      vals  =  laikad . astro_dog . navs . values ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      has_navs  =  len ( vals )  >  0  and  max ( [ len ( v )  for  v  in  vals ] )  >  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  has_navs : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        break 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertTrue ( has_navs ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertGreater ( len ( laikad . astro_dog . navs_fetched_times . _ranges ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    self . assertEqual ( None ,  laikad . orbit_fetch_future ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_cache ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = True ,  save_ephemeris = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    def  wait_for_cache ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      max_time  =  2 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      while  Params ( ) . get ( EPHEMERIS_CACHE )  is  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        time . sleep ( 0.1 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        max_time  - =  0.1 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        if  max_time  <  0 : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								          self . fail ( " Cache has not been written after 2 seconds " ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Test cache with no ephemeris 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad . cache_ephemeris ( t = GPSTime ( 0 ,  0 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    wait_for_cache ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    Params ( ) . remove ( EPHEMERIS_CACHE ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    #laikad.astro_dog.get_navs(self.first_gps_time) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad . fetch_navs ( self . first_gps_time ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Wait for cache to save 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    wait_for_cache ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Check both nav and orbits separate 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( auto_update = False ,  valid_ephem_types = EphemerisType . NAV ,  save_ephemeris = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Verify navs are loaded from cache 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . dict_has_values ( laikad . astro_dog . navs ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    # Verify cache is working for only nav by running a segment 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    msg  =  verify_messages ( self . logs ,  laikad ,  return_one_success = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertIsNotNone ( msg ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    with  patch ( ' selfdrive.locationd.laikad.get_orbit_data ' ,  return_value = None )  as  mock_method : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      laikad . astro_dog . orbit_fetched_times  =  TimeRangeHolder ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      laikad . fetch_navs ( self . first_gps_time ,  block = False ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      mock_method . assert_not_called ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Verify cache is working for only orbits by running a segment 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      laikad  =  Laikad ( auto_update = False ,  valid_ephem_types = EphemerisType . ULTRA_RAPID_ORBIT ,  save_ephemeris = True ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      msg  =  verify_messages ( self . logs ,  laikad ,  return_one_success = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . assertIsNotNone ( msg ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Verify orbit data is not downloaded 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      mock_method . assert_not_called ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  test_low_gnss_meas ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    cnt  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    laikad  =  Laikad ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    for  m  in  self . low_gnss : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg  =  laikad . process_gnss_msg ( m . ubloxGnss ,  m . logMonoTime ,  block = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  msg  is  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      gm  =  msg . gnssMeasurements 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  len ( gm . correctedMeasurements )  !=  0  and  gm . positionECEF . valid : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        cnt  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertEqual ( cnt ,  554 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  dict_has_values ( self ,  dct ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertGreater ( len ( dct ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . assertGreater ( min ( [ len ( v )  for  v  in  dct . values ( ) ] ) ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								if  __name__  ==  " __main__ " : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  unittest . main ( )