@ -1,4 +1,6 @@
#!/usr/bin/env python3
import threading
import time
from typing import List
import numpy as np
@ -8,7 +10,9 @@ from numpy.linalg import linalg
from cereal import log , messaging
from laika import AstroDog
from laika . ephemeris import convert_ublox_ephem
from laika . constants import SECS_IN_HR , SECS_IN_MIN
from laika . ephemeris import EphemerisType , convert_ublox_ephem
from laika . gps_time import GPSTime
from laika . helpers import ConstellationId
from laika . raw_gnss import GNSSMeasurement , calc_pos_fix , correct_measurements , process_measurements , read_raw_ublox
from selfdrive . locationd . models . constants import GENERATED_DIR , ObservationKind
@ -22,14 +26,18 @@ MAX_TIME_GAP = 10
class Laikad :
def __init__ ( self , use_internet ) :
self . astro_dog = AstroDog ( use_internet = use_internet )
def __init__ ( self , valid_const = ( " GPS " , ) , auto_update = False , valid_ephem_types = ( EphemerisType . ULTRA_RAPID_ORBIT , EphemerisType . NAV ) ) :
self . astro_dog = AstroDog ( valid_const = valid_const , use_internet = auto_update , valid_ephem_types = valid_ephem_types )
self . gnss_kf = GNSSKalman ( GENERATED_DIR )
self . latest_epoch_fetched = GPSTime ( 0 , 0 )
self . latest_time_msg = None
def process_ublox_msg ( self , ublox_msg , ublox_mono_time : int ) :
if ublox_msg . which == ' measurementReport ' :
report = ublox_msg . measurementReport
new_meas = read_raw_ublox ( report )
if report . gpsWeek > 0 :
self . latest_time_msg = GPSTime ( report . gpsWeek , report . rcvTow )
measurements = process_measurements ( new_meas , self . astro_dog )
pos_fix = calc_pos_fix ( measurements , min_measurements = 4 )
# To get a position fix a minimum of 5 measurements are needed.
@ -41,6 +49,7 @@ class Laikad:
t = ublox_mono_time * 1e-9
self . update_localizer ( pos_fix , t , corrected_measurements )
localizer_valid = self . localizer_valid ( t )
ecef_pos = self . gnss_kf . x [ GStates . ECEF_POS ] . tolist ( )
ecef_vel = self . gnss_kf . x [ GStates . ECEF_VELOCITY ] . tolist ( )
@ -50,7 +59,6 @@ class Laikad:
bearing_deg , bearing_std = get_bearing_from_gnss ( ecef_pos , ecef_vel , vel_std )
meas_msgs = [ create_measurement_msg ( m ) for m in corrected_measurements ]
dat = messaging . new_message ( " gnssMeasurements " )
measurement_msg = log . LiveLocationKalman . Measurement . new_message
dat . gnssMeasurements = {
@ -63,7 +71,7 @@ class Laikad:
return dat
elif ublox_msg . which == ' ephemeris ' :
ephem = convert_ublox_ephem ( ublox_msg . ephemeris )
self . astro_dog . add_ephem ( ephem , self . astro_dog . orbits )
self . astro_dog . add_ephems ( [ ephem ] , self . astro_dog . nav )
# elif ublox_msg.which == 'ionoData':
# todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
@ -90,8 +98,7 @@ class Laikad:
def localizer_valid ( self , t : float ) :
filter_time = self . gnss_kf . filter . filter_time
return filter_time is not None and ( t - filter_time ) < MAX_TIME_GAP and \
all ( np . isfinite ( self . gnss_kf . x [ GStates . ECEF_POS ] ) )
return filter_time is not None and ( t - filter_time ) < MAX_TIME_GAP and all ( np . isfinite ( self . gnss_kf . x [ GStates . ECEF_POS ] ) )
def init_gnss_localizer ( self , est_pos ) :
x_initial , p_initial_diag = np . copy ( GNSSKalman . x_initial ) , np . copy ( np . diagonal ( GNSSKalman . P_initial ) )
@ -100,6 +107,22 @@ class Laikad:
self . gnss_kf . init_state ( x_initial , covs_diag = p_initial_diag )
def orbit_thread ( self , end_event : threading . Event ) :
while not end_event . is_set ( ) :
if self . latest_time_msg :
self . fetch_orbits ( self . latest_time_msg )
time . sleep ( 0.1 )
def fetch_orbits ( self , t : GPSTime ) :
if self . latest_epoch_fetched < t + SECS_IN_MIN :
cloudlog . info ( " Start to download/parse orbits " )
orbit_ephems = self . astro_dog . download_parse_orbit_data ( t , skip_before_epoch = t - 2 * SECS_IN_HR )
if len ( orbit_ephems ) > 0 :
cloudlog . info ( f " downloaded and parsed correctly new orbits { len ( orbit_ephems ) } , Constellations: { set ( [ e . prn [ 0 ] for e in orbit_ephems ] ) } " )
self . astro_dog . add_ephems ( orbit_ephems , self . astro_dog . orbits )
latest_orbit = max ( orbit_ephems , key = lambda e : e . epoch ) # type: ignore
self . latest_epoch_fetched = latest_orbit . epoch
def create_measurement_msg ( meas : GNSSMeasurement ) :
c = log . GnssMeasurements . CorrectedMeasurement . new_message ( )
@ -144,16 +167,22 @@ def main():
sm = messaging . SubMaster ( [ ' ubloxGnss ' ] )
pm = messaging . PubMaster ( [ ' gnssMeasurements ' ] )
laikad = Laikad ( use_internet = True )
while True :
sm . update ( )
if sm . updated [ ' ubloxGnss ' ] :
ublox_msg = sm [ ' ubloxGnss ' ]
msg = laikad . process_ublox_msg ( ublox_msg , sm . logMonoTime [ ' ubloxGnss ' ] )
if msg is not None :
pm . send ( ' gnssMeasurements ' , msg )
laikad = Laikad ( )
end_event = threading . Event ( )
threading . Thread ( target = laikad . orbit_thread , args = ( end_event , ) ) . start ( )
try :
while not end_event . is_set ( ) :
sm . update ( )
if sm . updated [ ' ubloxGnss ' ] :
ublox_msg = sm [ ' ubloxGnss ' ]
msg = laikad . process_ublox_msg ( ublox_msg , sm . logMonoTime [ ' ubloxGnss ' ] )
if msg is not None :
pm . send ( ' gnssMeasurements ' , msg )
except ( KeyboardInterrupt , SystemExit ) :
end_event . set ( )
raise
if __name__ == " __main__ " :