#!/usr/bin/env python3
import time
import unittest
from cereal import log
from openpilot . common . params import Params
from datetime import datetime
from unittest import mock
from laika . constants import SECS_IN_DAY
from laika . downloader import DownloadFailed
from laika . ephemeris import EphemerisType
from laika . gps_time import GPSTime
from laika . helpers import ConstellationId
from laika . raw_gnss import GNSSMeasurement , read_raw_ublox , read_raw_qcom
from openpilot . selfdrive . locationd . laikad import EPHEMERIS_CACHE , Laikad
from openpilot . selfdrive . test . openpilotci import get_url
from openpilot . tools . lib . logreader import LogReader
from openpilot . selfdrive . test . process_replay . process_replay import get_process_config , replay_process
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime . from_datetime ( datetime ( 2022 , month = 1 , day = 29 , hour = 12 ) )
UBLOX_TEST_ROUTE = " 4cf7a6ad03080c90|2021-09-29--13-46-36 "
QCOM_TEST_ROUTE = " 616dc83ca1f7f11e|2023-07-11--10-52-31 "
def get_log_ublox ( ) :
logs = LogReader ( get_url ( UBLOX_TEST_ROUTE , 0 ) )
ublox_cfg = get_process_config ( " ubloxd " )
all_logs = replay_process ( ublox_cfg , logs )
low_gnss = [ ]
for m in all_logs :
if m . which ( ) != " ubloxGnss " or m . ubloxGnss . which ( ) != ' measurementReport ' :
continue
MAX_MEAS = 7
if m . ubloxGnss . measurementReport . numMeas > MAX_MEAS :
mb = log . Event . new_message ( ubloxGnss = m . ubloxGnss . to_dict ( ) )
mb . logMonoTime = m . logMonoTime
mb . ubloxGnss . measurementReport . numMeas = MAX_MEAS
mb . ubloxGnss . measurementReport . measurements = list ( m . ubloxGnss . measurementReport . measurements ) [ : MAX_MEAS ]
mb . ubloxGnss . measurementReport . measurements [ 0 ] . pseudorange + = 1000
low_gnss . append ( mb . as_reader ( ) )
else :
low_gnss . append ( m )
return all_logs , low_gnss
def get_log_qcom ( ) :
logs = LogReader ( get_url ( QCOM_TEST_ROUTE , 0 ) )
all_logs = [ m for m in logs if m . which ( ) == ' qcomGnss ' ]
return all_logs
def verify_messages ( lr , laikad , return_one_success = False ) :
good_msgs = [ ]
for m in lr :
if m . which ( ) == ' ubloxGnss ' :
gnss_msg = m . ubloxGnss
elif m . which ( ) == ' qcomGnss ' :
gnss_msg = m . qcomGnss
else :
continue
msg = laikad . process_gnss_msg ( gnss_msg , m . logMonoTime , block = True )
if msg is not None and len ( msg . gnssMeasurements . correctedMeasurements ) > 0 :
good_msgs . append ( msg )
if return_one_success :
return msg
return good_msgs
def get_first_gps_time ( logs ) :
for m in logs :
if m . which ( ) == ' ubloxGnss ' :
if m . ubloxGnss . which == ' measurementReport ' :
new_meas = read_raw_ublox ( m . ubloxGnss . measurementReport )
if len ( new_meas ) > 0 :
return new_meas [ 0 ] . recv_time
elif m . which ( ) == " qcomGnss " :
if m . qcomGnss . which == ' measurementReport ' :
new_meas = read_raw_qcom ( m . qcomGnss . measurementReport )
if len ( new_meas ) > 0 :
return new_meas [ 0 ] . recv_time
def get_measurement_mock ( gpstime , sat_ephemeris ) :
meas = GNSSMeasurement ( ConstellationId . GPS , 1 , gpstime . week , gpstime . tow , { ' C1C ' : 0. , ' D1C ' : 0. } , { ' C1C ' : 0. , ' D1C ' : 0. } )
# Fake measurement being processed
meas . observables_final = meas . observables
meas . sat_ephemeris = sat_ephemeris
return meas
class TestLaikad ( unittest . TestCase ) :
@classmethod
def setUpClass ( cls ) :
logs , low_gnss = get_log_ublox ( )
cls . logs = logs
cls . low_gnss = low_gnss
cls . logs_qcom = get_log_qcom ( )
first_gps_time = get_first_gps_time ( logs )
cls . first_gps_time = first_gps_time
def setUp ( self ) :
Params ( ) . remove ( EPHEMERIS_CACHE )
def test_fetch_navs_non_blocking ( self ) :
gpstime = GPSTime . from_datetime ( datetime ( 2021 , month = 3 , day = 1 ) )
laikad = Laikad ( )
laikad . fetch_navs ( gpstime , block = False )
laikad . orbit_fetch_future . result ( 30 )
# Get results and save orbits to laikad:
laikad . fetch_navs ( gpstime , block = False )
ephem = laikad . astro_dog . navs [ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
laikad . fetch_navs ( gpstime + 2 * SECS_IN_DAY , block = False )
laikad . orbit_fetch_future . result ( 30 )
# Get results and save orbits to laikad:
laikad . fetch_navs ( gpstime + 2 * SECS_IN_DAY , block = False )
ephem2 = laikad . astro_dog . navs [ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
self . assertNotEqual ( ephem , ephem2 )
def test_fetch_navs_with_wrong_clocks ( self ) :
laikad = Laikad ( )
def check_has_navs ( ) :
self . assertGreater ( len ( laikad . astro_dog . navs ) , 0 )
ephem = laikad . astro_dog . navs [ ' G01 ' ] [ 0 ]
self . assertIsNotNone ( ephem )
real_current_time = GPSTime . from_datetime ( datetime ( 2021 , month = 3 , day = 1 ) )
wrong_future_clock_time = real_current_time + SECS_IN_DAY
laikad . fetch_navs ( wrong_future_clock_time , block = True )
check_has_navs ( )
self . assertEqual ( laikad . last_fetch_navs_t , wrong_future_clock_time )
# Test fetching orbits with earlier time
assert real_current_time < laikad . last_fetch_navs_t
laikad . astro_dog . orbits = { }
laikad . fetch_navs ( real_current_time , block = True )
check_has_navs ( )
self . assertEqual ( laikad . last_fetch_navs_t , real_current_time )
def test_laika_online ( self ) :
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . ULTRA_RAPID_ORBIT )
correct_msgs = verify_messages ( self . logs , laikad )
correct_msgs_expected = 560
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_kf_becomes_valid ( self ) :
laikad = Laikad ( auto_update = False )
m = self . logs [ 0 ]
self . assertFalse ( all ( laikad . kf_valid ( m . logMonoTime * 1e-9 ) ) )
kf_valid = False
for m in self . logs :
if m . which ( ) != ' ubloxGnss ' :
continue
laikad . process_gnss_msg ( m . ubloxGnss , m . logMonoTime , block = True )
kf_valid = all ( laikad . kf_valid ( m . logMonoTime * 1e-9 ) )
if kf_valid :
break
self . assertTrue ( kf_valid )
def test_laika_online_nav_only ( self ) :
for use_qcom , logs in zip ( [ True , False ] , [ self . logs_qcom , self . logs ] , strict = True ) :
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . NAV , use_qcom = use_qcom )
# Disable fetch_orbits to test NAV only
correct_msgs = verify_messages ( logs , laikad )
correct_msgs_expected = 55 if use_qcom else 560
valid_fix_expected = 55 if use_qcom else 560
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( valid_fix_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
@mock . patch ( ' laika.downloader.download_and_cache_file ' )
def test_laika_offline ( self , downloader_mock ) :
downloader_mock . side_effect = DownloadFailed ( " Mock download failed " )
laikad = Laikad ( auto_update = False )
laikad . fetch_navs ( GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC , block = True )
@mock . patch ( ' laika.downloader.download_and_cache_file ' )
def test_download_failed_russian_source ( self , downloader_mock ) :
downloader_mock . side_effect = DownloadFailed
laikad = Laikad ( auto_update = False )
correct_msgs = verify_messages ( self . logs , laikad )
expected_msgs = 376
self . assertEqual ( expected_msgs , len ( correct_msgs ) )
self . assertEqual ( expected_msgs , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_get_orbits ( self ) :
laikad = Laikad ( auto_update = False )
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad . fetch_navs ( self . first_gps_time , block = True )
self . dict_has_values ( laikad . astro_dog . navs )
@unittest . skip ( " Use to debug live data " )
def test_laika_get_navs_now ( self ) :
laikad = Laikad ( auto_update = False )
laikad . fetch_navs ( GPSTime . from_datetime ( datetime . utcnow ( ) ) , block = True )
prn = " G01 "
self . assertGreater ( len ( laikad . astro_dog . navs [ prn ] ) , 0 )
prn = " R01 "
self . assertGreater ( len ( laikad . astro_dog . navs [ prn ] ) , 0 )
def test_get_navs_in_process ( self ) :
for auto_fetch_navs in [ True , False ] :
for use_qcom , logs in zip ( [ True , False ] , [ self . logs_qcom , self . logs ] , strict = True ) :
laikad = Laikad ( auto_update = False , use_qcom = use_qcom , auto_fetch_navs = auto_fetch_navs )
has_navs = False
has_fix = False
seen_chip_eph = False
seen_internet_eph = False
for m in logs :
if m . which ( ) != ' ubloxGnss ' and m . which ( ) != ' qcomGnss ' :
continue
gnss_msg = m . qcomGnss if use_qcom else m . ubloxGnss
out_msg = laikad . process_gnss_msg ( gnss_msg , m . logMonoTime , block = False )
if laikad . orbit_fetch_future is not None :
laikad . orbit_fetch_future . result ( )
vals = laikad . astro_dog . navs . values ( )
has_navs = len ( vals ) > 0 and max ( [ len ( v ) for v in vals ] ) > 0
vals = laikad . astro_dog . qcom_polys . values ( )
has_polys = len ( vals ) > 0 and max ( [ len ( v ) for v in vals ] ) > 0
has_fix = has_fix or out_msg . gnssMeasurements . positionECEF . valid
if len ( out_msg . gnssMeasurements . ephemerisStatuses ) :
seen_chip_eph = seen_chip_eph or any ( x . source == ' gnssChip ' for x in out_msg . gnssMeasurements . ephemerisStatuses )
seen_internet_eph = seen_internet_eph or any ( x . source == ' internet ' for x in out_msg . gnssMeasurements . ephemerisStatuses )
self . assertTrue ( has_navs or has_polys )
self . assertTrue ( has_fix )
self . assertTrue ( seen_chip_eph or auto_fetch_navs )
self . assertTrue ( seen_internet_eph or not auto_fetch_navs )
self . assertEqual ( len ( laikad . astro_dog . navs_fetched_times . _ranges ) , 0 )
self . assertEqual ( None , laikad . orbit_fetch_future )
def test_cache ( self ) :
use_qcom = True
for use_qcom , logs in zip ( [ True , False ] , [ self . logs_qcom , self . logs ] , strict = True ) :
Params ( ) . remove ( EPHEMERIS_CACHE )
laikad = Laikad ( auto_update = True , save_ephemeris = True , use_qcom = use_qcom )
def wait_for_cache ( ) :
max_time = 2
while Params ( ) . get ( EPHEMERIS_CACHE ) is None :
time . sleep ( 0.1 )
max_time - = 0.1
if max_time < 0 :
self . fail ( " Cache has not been written after 2 seconds " )
# Test cache with no ephemeris
laikad . last_report_time = GPSTime ( 1 , 0 )
laikad . cache_ephemeris ( )
if Params ( ) . get ( EPHEMERIS_CACHE ) is not None :
self . fail ( " Cache should not have been written without valid ephem " )
#laikad.astro_dog.get_navs(self.first_gps_time)
msg = verify_messages ( logs , laikad , return_one_success = True )
laikad . cache_ephemeris ( )
wait_for_cache ( )
# Check both nav and orbits separate
laikad = Laikad ( auto_update = False , valid_ephem_types = EphemerisType . NAV ,
save_ephemeris = True , use_qcom = use_qcom , auto_fetch_navs = False )
# Verify navs are loaded from cache
self . dict_has_values ( laikad . astro_dog . navs )
# Verify cache is working for only nav by running a segment
msg = verify_messages ( logs , laikad , return_one_success = True )
self . assertTrue ( len ( msg . gnssMeasurements . ephemerisStatuses ) )
self . assertTrue ( any ( x . source == ' cache ' for x in msg . gnssMeasurements . ephemerisStatuses ) )
self . assertIsNotNone ( msg )
#TODO test cache with only orbits
#with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
# # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
# laikad.astro_dog.orbit_fetched_times = TimeRangeHolder()
# laikad.fetch_navs(self.first_gps_time, block=False)
# mock_method.assert_not_called()
# # Verify cache is working for only orbits by running a segment
# laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
# msg = verify_messages(self.logs, laikad, return_one_success=True)
# self.assertIsNotNone(msg)
# # Verify orbit data is not downloaded
# mock_method.assert_not_called()
#break
def test_low_gnss_meas ( self ) :
cnt = 0
laikad = Laikad ( )
for m in self . low_gnss :
msg = laikad . process_gnss_msg ( m . ubloxGnss , m . logMonoTime , block = True )
if msg is None :
continue
gm = msg . gnssMeasurements
if len ( gm . correctedMeasurements ) != 0 and gm . positionECEF . valid :
cnt + = 1
self . assertEqual ( cnt , 560 )
def dict_has_values ( self , dct ) :
self . assertGreater ( len ( dct ) , 0 )
self . assertGreater ( min ( [ len ( v ) for v in dct . values ( ) ] ) , 0 )
if __name__ == " __main__ " :
unittest . main ( )