#!/usr/bin/env python3
import unittest
from datetime import datetime
from laika . ephemeris import EphemerisType
from laika . gps_time import GPSTime
from laika . helpers import ConstellationId
from laika . raw_gnss import GNSSMeasurement , read_raw_ublox
from selfdrive . locationd . laikad import Laikad , create_measurement_msg
from selfdrive . test . openpilotci import get_url
from tools . lib . logreader import LogReader
def get_log ( segs = range ( 0 ) ) :
logs = [ ]
for i in segs :
logs . extend ( LogReader ( get_url ( " 4cf7a6ad03080c90|2021-09-29--13-46-36 " , i ) ) )
return [ m for m in logs if m . which ( ) == ' ubloxGnss ' ]
def verify_messages ( lr , laikad ) :
good_msgs = [ ]
for m in lr :
msg = laikad . process_ublox_msg ( m . ubloxGnss , m . logMonoTime )
if msg is not None and len ( msg . gnssMeasurements . correctedMeasurements ) > 0 :
good_msgs . append ( msg )
return good_msgs
class TestLaikad ( unittest . TestCase ) :
@classmethod
def setUpClass ( cls ) :
cls . logs = get_log ( range ( 1 ) )
def test_create_msg_without_errors ( self ) :
gpstime = GPSTime . from_datetime ( datetime . now ( ) )
meas = GNSSMeasurement ( ConstellationId . GPS , 1 , gpstime . week , gpstime . tow , { ' C1C ' : 0. , ' D1C ' : 0. } , { ' C1C ' : 0. , ' D1C ' : 0. } )
# Fake observables_final to be correct
meas . observables_final = meas . observables
msg = create_measurement_msg ( meas )
self . assertEqual ( msg . constellationId , ' gps ' )
def test_laika_online ( self ) :
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . ULTRA_RAPID_ORBIT )
correct_msgs = verify_messages ( self . logs , laikad )
correct_msgs_expected = 560
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_online_nav_only ( self ) :
laikad = Laikad ( auto_update = True , valid_ephem_types = EphemerisType . NAV )
correct_msgs = verify_messages ( self . logs , laikad )
correct_msgs_expected = 560
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_offline ( self ) :
# Set auto_update to false forces to use ephemeris messages
laikad = Laikad ( auto_update = False )
correct_msgs = verify_messages ( self . logs , laikad )
self . assertEqual ( 256 , len ( correct_msgs ) )
self . assertEqual ( 256 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_offline_ephem_at_start ( self ) :
# Test offline but process ephemeris msgs of segment first
laikad = Laikad ( auto_update = False , valid_ephem_types = EphemerisType . NAV )
ephemeris_logs = [ m for m in self . logs if m . ubloxGnss . which ( ) == ' ephemeris ' ]
correct_msgs = verify_messages ( ephemeris_logs + self . logs , laikad )
self . assertEqual ( 554 , len ( correct_msgs ) )
self . assertGreaterEqual ( 554 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_get_orbits ( self ) :
laikad = Laikad ( auto_update = False )
first_gps_time = None
for m in self . logs :
if m . ubloxGnss . which == ' measurementReport ' :
new_meas = read_raw_ublox ( m . ubloxGnss . measurementReport )
if len ( new_meas ) != 0 :
first_gps_time = new_meas [ 0 ] . recv_time
break
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad . fetch_orbits ( first_gps_time , block = True )
self . assertEqual ( 29 , len ( laikad . astro_dog . orbits . keys ( ) ) )
@unittest . skip ( " Use to debug live data " )
def test_laika_get_orbits_now ( self ) :
laikad = Laikad ( auto_update = False )
laikad . fetch_orbits ( GPSTime . from_datetime ( datetime . utcnow ( ) ) , block = True )
prn = " G01 "
self . assertLess ( 0 , len ( laikad . astro_dog . orbits [ prn ] ) )
prn = " R01 "
self . assertLess ( 0 , len ( laikad . astro_dog . orbits [ prn ] ) )
print ( min ( laikad . astro_dog . orbits [ prn ] , key = lambda e : e . epoch ) . epoch . as_datetime ( ) )
if __name__ == " __main__ " :
unittest . main ( )