import unittest
import numpy as np
from laika import AstroDog
from laika . helpers import ConstellationId
from laika . raw_gnss import calc_pos_fix , correct_measurements , process_measurements , read_raw_ublox
from selfdrive . test . openpilotci import get_url
from tools . lib . logreader import LogReader
def get_gnss_measurements ( log_reader ) :
gnss_measurements = [ ]
for msg in log_reader :
if msg . which ( ) == " ubloxGnss " :
ublox_msg = msg . ubloxGnss
if ublox_msg . which == ' measurementReport ' :
report = ublox_msg . measurementReport
if len ( report . measurements ) > 0 :
gnss_measurements . append ( read_raw_ublox ( report ) )
return gnss_measurements
class TestUbloxProcessing ( unittest . TestCase ) :
NUM_TEST_PROCESS_MEAS = 10
@classmethod
def setUpClass ( cls ) :
lr = LogReader ( get_url ( " 4cf7a6ad03080c90|2021-09-29--13-46-36 " , 0 ) )
cls . gnss_measurements = get_gnss_measurements ( lr )
def test_read_ublox_raw ( self ) :
count_gps = 0
count_glonass = 0
for measurements in self . gnss_measurements :
for m in measurements :
if m . constellation_id == ConstellationId . GPS :
count_gps + = 1
elif m . constellation_id == ConstellationId . GLONASS :
count_glonass + = 1
self . assertEqual ( count_gps , 5036 )
self . assertEqual ( count_glonass , 3651 )
def test_get_fix ( self ) :
dog = AstroDog ( )
position_fix_found = 0
count_processed_measurements = 0
count_corrected_measurements = 0
position_fix_found_after_correcting = 0
pos_ests = [ ]
for measurements in self . gnss_measurements [ : self . NUM_TEST_PROCESS_MEAS ] :
processed_meas = process_measurements ( measurements , dog )
count_processed_measurements + = len ( processed_meas )
pos_fix = calc_pos_fix ( processed_meas )
if len ( pos_fix ) > 0 and all ( pos_fix [ 0 ] != 0 ) :
position_fix_found + = 1
corrected_meas = correct_measurements ( processed_meas , pos_fix [ 0 ] [ : 3 ] , dog )
count_corrected_measurements + = len ( corrected_meas )
pos_fix = calc_pos_fix ( corrected_meas )
if len ( pos_fix ) > 0 and all ( pos_fix [ 0 ] != 0 ) :
pos_ests . append ( pos_fix [ 0 ] )
position_fix_found_after_correcting + = 1
mean_fix = np . mean ( np . array ( pos_ests ) [ : , : 3 ] , axis = 0 )
np . testing . assert_allclose ( mean_fix , [ - 2452306.662377 , - 4778343.136806 , 3428550.090557 ] , rtol = 0 , atol = 1 )
# Note that can happen that there are less corrected measurements compared to processed when they are invalid.
# However, not for the current segment
self . assertEqual ( position_fix_found , self . NUM_TEST_PROCESS_MEAS )
self . assertEqual ( position_fix_found_after_correcting , self . NUM_TEST_PROCESS_MEAS )
self . assertEqual ( count_processed_measurements , 69 )
self . assertEqual ( count_corrected_measurements , 69 )
if __name__ == " __main__ " :
unittest . main ( )