@ -4,7 +4,7 @@ import unittest
from collections import defaultdict
from datetime import datetime
from unittest import mock
from unittest . mock import Mock , patch
from unittest . mock import patch
from common . params import Params
from laika . constants import SECS_IN_DAY
@ -67,47 +67,47 @@ class TestLaikad(unittest.TestCase):
def setUp ( self ) :
Params ( ) . remove ( EPHEMERIS_CACHE )
def test_fetch_orbit s_non_blocking ( self ) :
def test_fetch_nav s_non_blocking ( self ) :
gpstime = GPSTime . from_datetime ( datetime ( 2021 , month = 3 , day = 1 ) )
laikad = Laikad ( )
laikad . fetch_orbit s ( gpstime , block = False )
laikad . fetch_nav s ( gpstime , block = False )
laikad . orbit_fetch_future . result ( 30 )
# Get results and save orbits to laikad:
laikad . fetch_orbit s ( gpstime , block = False )
laikad . fetch_nav s ( gpstime , block = False )
ephem = laikad . astro_dog . orbit s[ ' G01 ' ] [ 0 ]
ephem = laikad . astro_dog . nav s[ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
laikad . fetch_orbit s ( gpstime + 2 * SECS_IN_DAY , block = False )
laikad . fetch_nav s ( gpstime + 2 * SECS_IN_DAY , block = False )
laikad . orbit_fetch_future . result ( 30 )
# Get results and save orbits to laikad:
laikad . fetch_orbit s ( gpstime + 2 * SECS_IN_DAY , block = False )
laikad . fetch_nav s ( gpstime + 2 * SECS_IN_DAY , block = False )
ephem2 = laikad . astro_dog . orbit s[ ' G01 ' ] [ 0 ]
ephem2 = laikad . astro_dog . nav s[ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
self . assertNotEqual ( ephem , ephem2 )
def test_fetch_orbit s_with_wrong_clocks ( self ) :
def test_fetch_nav s_with_wrong_clocks ( self ) :
laikad = Laikad ( )
def check_has_orbit s ( ) :
self . assertGreater ( len ( laikad . astro_dog . orbit s) , 0 )
ephem = laikad . astro_dog . orbit s[ ' G01 ' ] [ 0 ]
def check_has_nav s ( ) :
self . assertGreater ( len ( laikad . astro_dog . nav s) , 0 )
ephem = laikad . astro_dog . nav s[ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
real_current_time = GPSTime . from_datetime ( datetime ( 2021 , month = 3 , day = 1 ) )
wrong_future_clock_time = real_current_time + SECS_IN_DAY
laikad . fetch_orbit s ( wrong_future_clock_time , block = True )
check_has_orbit s ( )
self . assertEqual ( laikad . last_fetch_orbit s_t , wrong_future_clock_time )
laikad . fetch_nav s ( wrong_future_clock_time , block = True )
check_has_nav s ( )
self . assertEqual ( laikad . last_fetch_nav s_t , wrong_future_clock_time )
# Test fetching orbits with earlier time
assert real_current_time < laikad . last_fetch_orbit s_t
assert real_current_time < laikad . last_fetch_nav s_t
laikad . astro_dog . orbits = { }
laikad . fetch_orbit s ( real_current_time , block = True )
check_has_orbit s ( )
self . assertEqual ( laikad . last_fetch_orbit s_t , real_current_time )
laikad . fetch_nav s ( real_current_time , block = True )
check_has_nav s ( )
self . assertEqual ( laikad . last_fetch_nav s_t , real_current_time )
def test_ephemeris_source_in_msg ( self ) :
data_mock = defaultdict ( str )
@ -115,22 +115,22 @@ class TestLaikad(unittest.TestCase):
gpstime = GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC
laikad = Laikad ( )
laikad . fetch_orbit s ( gpstime , block = True )
meas = get_measurement_mock ( gpstime , laikad . astro_dog . orbit s[ ' R01 ' ] [ 0 ] )
laikad . fetch_nav s ( gpstime , block = True )
meas = get_measurement_mock ( gpstime , laikad . astro_dog . nav s[ ' R01 ' ] [ 0 ] )
msg = create_measurement_msg ( meas )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . glonassIacUltraRapid )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . nav )
# Verify gps satellite returns same source
meas = get_measurement_mock ( gpstime , laikad . astro_dog . orbit s[ ' R01 ' ] [ 0 ] )
meas = get_measurement_mock ( gpstime , laikad . astro_dog . nav s[ ' R01 ' ] [ 0 ] )
msg = create_measurement_msg ( meas )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . glonassIacUltraRapid )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . nav )
# Test nasa source by using older date
gpstime = GPSTime . from_datetime ( datetime ( 2021 , month = 3 , day = 1 ) )
laikad = Laikad ( )
laikad . fetch_orbit s ( gpstime , block = True )
meas = get_measurement_mock ( gpstime , laikad . astro_dog . orbit s[ ' G01 ' ] [ 0 ] )
laikad . fetch_nav s ( gpstime , block = True )
meas = get_measurement_mock ( gpstime , laikad . astro_dog . nav s[ ' G01 ' ] [ 0 ] )
msg = create_measurement_msg ( meas )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . nasaUltraRapid )
self . assertEqual ( msg . ephemerisSource . type . raw , EphemerisSourceType . nav )
# Test nav source type
ephem = GPSEphemeris ( data_mock , gpstime )
@ -142,7 +142,7 @@ class TestLaikad(unittest.TestCase):
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . ULTRA_RAPID_ORBIT )
correct_msgs = verify_messages ( self . logs , laikad )
correct_msgs_expected = 555
correct_msgs_expected = 554
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
@ -161,9 +161,8 @@ class TestLaikad(unittest.TestCase):
def test_laika_online_nav_only ( self ) :
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . NAV )
# Disable fetch_orbits to test NAV only
laikad . fetch_orbits = Mock ( )
correct_msgs = verify_messages ( self . logs , laikad )
correct_msgs_expected = 559
correct_msgs_expected = 554
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
@ -171,45 +170,45 @@ class TestLaikad(unittest.TestCase):
def test_laika_offline ( self , downloader_mock ) :
downloader_mock . side_effect = DownloadFailed ( " Mock download failed " )
laikad = Laikad ( auto_update = False )
laikad . fetch_orbit s ( GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC , block = True )
laikad . fetch_nav s ( GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC , block = True )
@mock . patch ( ' laika.downloader.download_and_cache_file ' )
def test_download_failed_russian_source ( self , downloader_mock ) :
downloader_mock . side_effect = DownloadFailed
laikad = Laikad ( auto_update = False )
correct_msgs = verify_messages ( self . logs , laikad )
self . assertEqual ( 16 , len ( correct_msgs ) )
self . assertEqual ( 16 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
self . assertEqual ( 0 , len ( correct_msgs ) )
self . assertEqual ( 0 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_get_orbits ( self ) :
laikad = Laikad ( auto_update = False )
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad . fetch_orbit s ( self . first_gps_time , block = True )
self . dict_has_values ( laikad . astro_dog . orbit s)
laikad . fetch_nav s ( self . first_gps_time , block = True )
self . dict_has_values ( laikad . astro_dog . nav s)
@unittest . skip ( " Use to debug live data " )
def test_laika_get_orbit s_now ( self ) :
def test_laika_get_nav s_now ( self ) :
laikad = Laikad ( auto_update = False )
laikad . fetch_orbit s ( GPSTime . from_datetime ( datetime . utcnow ( ) ) , block = True )
laikad . fetch_nav s ( GPSTime . from_datetime ( datetime . utcnow ( ) ) , block = True )
prn = " G01 "
self . assertGreater ( len ( laikad . astro_dog . orbit s[ prn ] ) , 0 )
self . assertGreater ( len ( laikad . astro_dog . nav s[ prn ] ) , 0 )
prn = " R01 "
self . assertGreater ( len ( laikad . astro_dog . orbit s[ prn ] ) , 0 )
print ( min ( laikad . astro_dog . orbit s[ prn ] , key = lambda e : e . epoch ) . epoch . as_datetime ( ) )
self . assertGreater ( len ( laikad . astro_dog . nav s[ prn ] ) , 0 )
print ( min ( laikad . astro_dog . nav s[ prn ] , key = lambda e : e . epoch ) . epoch . as_datetime ( ) )
def test_get_orbit s_in_process ( self ) :
def test_get_nav s_in_process ( self ) :
laikad = Laikad ( auto_update = False )
has_orbit s = False
has_nav s = False
for m in self . logs :
laikad . process_gnss_msg ( m . ubloxGnss , m . logMonoTime , block = False )
if laikad . orbit_fetch_future is not None :
laikad . orbit_fetch_future . result ( )
vals = laikad . astro_dog . orbit s. values ( )
has_orbit s = len ( vals ) > 0 and max ( [ len ( v ) for v in vals ] ) > 0
if has_orbit s :
vals = laikad . astro_dog . nav s. values ( )
has_nav s = len ( vals ) > 0 and max ( [ len ( v ) for v in vals ] ) > 0
if has_nav s :
break
self . assertTrue ( has_orbit s )
self . assertGreater ( len ( laikad . astro_dog . orbit _fetched_times. _ranges ) , 0 )
self . assertTrue ( has_nav s )
self . assertGreater ( len ( laikad . astro_dog . navs _fetched_times. _ranges ) , 0 )
self . assertEqual ( None , laikad . orbit_fetch_future )
def test_cache ( self ) :
@ -228,17 +227,16 @@ class TestLaikad(unittest.TestCase):
wait_for_cache ( )
Params ( ) . remove ( EPHEMERIS_CACHE )
laikad . astro_dog . get_navs ( self . first_gps_time )
laikad . fetch_orbit s ( self . first_gps_time , block = True )
#laikad.astro_dog.get_navs(self.first_gps_time )
laikad . fetch_nav s ( self . first_gps_time , block = True )
# Wait for cache to save
wait_for_cache ( )
# Check both nav and orbits separate
laikad = Laikad ( auto_update = False , valid_ephem_types = EphemerisType . NAV , save_ephemeris = True )
# Verify orbits and nav are loaded from cache
self . dict_has_values ( laikad . astro_dog . orbits )
self . dict_has_values ( laikad . astro_dog . nav )
# Verify navs are loaded from cache
self . dict_has_values ( laikad . astro_dog . navs )
# Verify cache is working for only nav by running a segment
msg = verify_messages ( self . logs , laikad , return_one_success = True )
self . assertIsNotNone ( msg )
@ -246,7 +244,7 @@ class TestLaikad(unittest.TestCase):
with patch ( ' selfdrive.locationd.laikad.get_orbit_data ' , return_value = None ) as mock_method :
# Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
laikad . astro_dog . orbit_fetched_times = TimeRangeHolder ( )
laikad . fetch_orbit s ( self . first_gps_time , block = False )
laikad . fetch_nav s ( self . first_gps_time , block = False )
mock_method . assert_not_called ( )
# Verify cache is working for only orbits by running a segment