import unittest
import time
import numpy as np
from laika import AstroDog
from laika . helpers import ConstellationId
from laika . raw_gnss import correct_measurements , process_measurements , read_raw_ublox
from laika . opt import calc_pos_fix
from openpilot . selfdrive . test . openpilotci import get_url
from openpilot . tools . lib . logreader import LogReader
from openpilot . selfdrive . test . helpers import with_processes
import cereal . messaging as messaging
def get_gnss_measurements ( log_reader ) :
gnss_measurements = [ ]
for msg in log_reader :
if msg . which ( ) == " ubloxGnss " :
ublox_msg = msg . ubloxGnss
if ublox_msg . which == ' measurementReport ' :
report = ublox_msg . measurementReport
if len ( report . measurements ) > 0 :
gnss_measurements . append ( read_raw_ublox ( report ) )
return gnss_measurements
def get_ublox_raw ( log_reader ) :
ublox_raw = [ ]
for msg in log_reader :
if msg . which ( ) == " ubloxRaw " :
ublox_raw . append ( msg )
return ublox_raw
class TestUbloxProcessing ( unittest . TestCase ) :
NUM_TEST_PROCESS_MEAS = 10
@classmethod
def setUpClass ( cls ) :
lr = LogReader ( get_url ( " 4cf7a6ad03080c90|2021-09-29--13-46-36 " , 0 ) )
cls . gnss_measurements = get_gnss_measurements ( lr )
# test gps ephemeris continuity check (drive has ephemeris issues with cutover data)
lr = LogReader ( get_url ( " 37b6542f3211019a|2023-01-15--23-45-10 " , 14 ) )
cls . ublox_raw = get_ublox_raw ( lr )
def test_read_ublox_raw ( self ) :
count_gps = 0
count_glonass = 0
for measurements in self . gnss_measurements :
for m in measurements :
if m . constellation_id == ConstellationId . GPS :
count_gps + = 1
elif m . constellation_id == ConstellationId . GLONASS :
count_glonass + = 1
self . assertEqual ( count_gps , 5036 )
self . assertEqual ( count_glonass , 3651 )
def test_get_fix ( self ) :
dog = AstroDog ( )
position_fix_found = 0
count_processed_measurements = 0
count_corrected_measurements = 0
position_fix_found_after_correcting = 0
pos_ests = [ ]
for measurements in self . gnss_measurements [ : self . NUM_TEST_PROCESS_MEAS ] :
processed_meas = process_measurements ( measurements , dog )
count_processed_measurements + = len ( processed_meas )
pos_fix = calc_pos_fix ( processed_meas )
if len ( pos_fix ) > 0 and all ( p != 0 for p in pos_fix [ 0 ] ) :
position_fix_found + = 1
corrected_meas = correct_measurements ( processed_meas , pos_fix [ 0 ] [ : 3 ] , dog )
count_corrected_measurements + = len ( corrected_meas )
pos_fix = calc_pos_fix ( corrected_meas )
if len ( pos_fix ) > 0 and all ( p != 0 for p in pos_fix [ 0 ] ) :
pos_ests . append ( pos_fix [ 0 ] )
position_fix_found_after_correcting + = 1
mean_fix = np . mean ( np . array ( pos_ests ) [ : , : 3 ] , axis = 0 )
np . testing . assert_allclose ( mean_fix , [ - 2452306.662377 , - 4778343.136806 , 3428550.090557 ] , rtol = 0 , atol = 1 )
# Note that can happen that there are less corrected measurements compared to processed when they are invalid.
# However, not for the current segment
self . assertEqual ( position_fix_found , self . NUM_TEST_PROCESS_MEAS )
self . assertEqual ( position_fix_found_after_correcting , self . NUM_TEST_PROCESS_MEAS )
self . assertEqual ( count_processed_measurements , 69 )
self . assertEqual ( count_corrected_measurements , 69 )
@with_processes ( [ ' ubloxd ' ] )
def test_ublox_gps_cutover ( self ) :
time . sleep ( 2 )
ugs = messaging . sub_sock ( " ubloxGnss " , timeout = 0.1 )
ur_pm = messaging . PubMaster ( [ ' ubloxRaw ' ] )
def replay_segment ( ) :
rcv_msgs = [ ]
for msg in self . ublox_raw :
ur_pm . send ( msg . which ( ) , msg . as_builder ( ) )
time . sleep ( 0.01 )
rcv_msgs + = messaging . drain_sock ( ugs )
time . sleep ( 0.1 )
rcv_msgs + = messaging . drain_sock ( ugs )
return rcv_msgs
# replay twice to enforce cutover data on rewind
rcv_msgs = replay_segment ( )
rcv_msgs + = replay_segment ( )
ephems_cnt = sum ( m . ubloxGnss . which ( ) == ' ephemeris ' for m in rcv_msgs )
self . assertEqual ( ephems_cnt , 15 )
if __name__ == " __main__ " :
unittest . main ( )