#!/usr/bin/env python3
import unittest
from datetime import datetime
from laika . gps_time import GPSTime
from laika . helpers import ConstellationId
from laika . raw_gnss import GNSSMeasurement
from selfdrive . locationd . laikad import Laikad , create_measurement_msg
from selfdrive . test . openpilotci import get_url
from tools . lib . logreader import LogReader
def get_log ( segs = range ( 0 ) ) :
logs = [ ]
for i in segs :
logs . extend ( LogReader ( get_url ( " 4cf7a6ad03080c90|2021-09-29--13-46-36 " , i ) ) )
return [ m for m in logs if m . which ( ) == ' ubloxGnss ' ]
def process_msgs ( lr , laikad : Laikad ) :
good_msgs = [ ]
for m in lr :
msg = laikad . process_ublox_msg ( m . ubloxGnss , m . logMonoTime )
if msg is not None :
good_msgs . append ( msg )
return good_msgs
class TestLaikad ( unittest . TestCase ) :
@classmethod
def setUpClass ( cls ) :
cls . logs = get_log ( range ( 1 ) )
def test_create_msg_without_errors ( self ) :
gpstime = GPSTime . from_datetime ( datetime . now ( ) )
meas = GNSSMeasurement ( ConstellationId . GPS , 1 , gpstime . week , gpstime . tow , { ' C1C ' : 0. , ' D1C ' : 0. } , { ' C1C ' : 0. , ' D1C ' : 0. } )
# Fake observables_final to be correct
meas . observables_final = meas . observables
msg = create_measurement_msg ( meas )
self . assertEqual ( msg . constellationId , ' gps ' )
def test_laika_online ( self ) :
# Set to offline forces to use ephemeris messages
laikad = Laikad ( use_internet = True )
msgs = process_msgs ( self . logs , laikad )
correct_msgs = [ m for m in msgs if len ( m . gnssMeasurements . correctedMeasurements ) > 0 ]
correct_msgs_expected = 560
self . assertEqual ( correct_msgs_expected , len ( correct_msgs ) )
self . assertEqual ( correct_msgs_expected , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_offline ( self ) :
# Set to offline forces to use ephemeris messages
laikad = Laikad ( use_internet = False )
msgs = process_msgs ( self . logs , laikad )
correct_msgs = [ m for m in msgs if len ( m . gnssMeasurements . correctedMeasurements ) > 0 ]
self . assertEqual ( 256 , len ( correct_msgs ) )
self . assertEqual ( 256 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
def test_laika_offline_ephem_at_start ( self ) :
# Test offline but process ephemeris msgs of segment first
laikad = Laikad ( use_internet = False )
ephemeris_logs = [ m for m in self . logs if m . ubloxGnss . which ( ) == ' ephemeris ' ]
msgs = process_msgs ( ephemeris_logs + self . logs , laikad )
correct_msgs = [ m for m in msgs if len ( m . gnssMeasurements . correctedMeasurements ) > 0 ]
self . assertEqual ( 554 , len ( correct_msgs ) )
self . assertGreaterEqual ( 554 , len ( [ m for m in correct_msgs if m . gnssMeasurements . positionECEF . valid ] ) )
if __name__ == " __main__ " :
unittest . main ( )