@ -6,11 +6,9 @@ from typing import List
import numpy as np
import numpy as np
from collections import defaultdict
from collections import defaultdict
from numpy . linalg import linalg
from cereal import log , messaging
from cereal import log , messaging
from laika import AstroDog
from laika import AstroDog
from laika . constants import SECS_IN_HR , SECS_IN_ MIN
from laika . constants import SECS_IN_MIN
from laika . ephemeris import EphemerisType , convert_ublox_ephem
from laika . ephemeris import EphemerisType , convert_ublox_ephem
from laika . gps_time import GPSTime
from laika . gps_time import GPSTime
from laika . helpers import ConstellationId
from laika . helpers import ConstellationId
@ -26,10 +24,9 @@ MAX_TIME_GAP = 10
class Laikad :
class Laikad :
def __init__ ( self , valid_const = ( " GPS " , ) , auto_update = False , valid_ephem_types = ( EphemerisType . ULTRA_RAPID_ORBIT , EphemerisType . NAV ) ) :
def __init__ ( self , valid_const = ( " GPS " , " GLONASS " ) , auto_update = False , valid_ephem_types = ( EphemerisType . ULTRA_RAPID_ORBIT , EphemerisType . NAV ) ) :
self . astro_dog = AstroDog ( valid_const = valid_const , use_internet = auto_update , valid_ephem_types = valid_ephem_types )
self . astro_dog = AstroDog ( valid_const = valid_const , auto_update = auto_update , valid_ephem_types = valid_ephem_types )
self . gnss_kf = GNSSKalman ( GENERATED_DIR )
self . gnss_kf = GNSSKalman ( GENERATED_DIR )
self . latest_epoch_fetched = GPSTime ( 0 , 0 )
self . latest_time_msg = None
self . latest_time_msg = None
def process_ublox_msg ( self , ublox_msg , ublox_mono_time : int ) :
def process_ublox_msg ( self , ublox_msg , ublox_mono_time : int ) :
@ -43,7 +40,7 @@ class Laikad:
# To get a position fix a minimum of 5 measurements are needed.
# To get a position fix a minimum of 5 measurements are needed.
# Each report can contain less and some measurements can't be processed.
# Each report can contain less and some measurements can't be processed.
corrected_measurements = [ ]
corrected_measurements = [ ]
if len ( pos_fix ) > 0 and linalg . norm ( pos_fix [ 1 ] ) < 100 :
if len ( pos_fix ) > 0 and abs ( np . array ( pos_fix [ 1 ] ) ) . mean ( ) < 10 00 :
corrected_measurements = correct_measurements ( measurements , pos_fix [ 0 ] [ : 3 ] , self . astro_dog )
corrected_measurements = correct_measurements ( measurements , pos_fix [ 0 ] [ : 3 ] , self . astro_dog )
t = ublox_mono_time * 1e-9
t = ublox_mono_time * 1e-9
@ -110,18 +107,15 @@ class Laikad:
def orbit_thread ( self , end_event : threading . Event ) :
def orbit_thread ( self , end_event : threading . Event ) :
while not end_event . is_set ( ) :
while not end_event . is_set ( ) :
if self . latest_time_msg :
if self . latest_time_msg :
self . fetch_orbits ( self . latest_time_msg )
self . fetch_orbits ( self . latest_time_msg + SECS_IN_MIN )
time . sleep ( 0.1 )
time . sleep ( 0.1 )
def fetch_orbits ( self , t : GPSTime ) :
def fetch_orbits ( self , t : GPSTime ) :
if self . latest_epoch_fetched < t + SECS_IN_MIN :
if t not in self . astro_dog . orbit_fetched_times :
cloudlog . info ( " Start to download/parse orbits " )
cloudlog . info ( f " Start to download/parse orbits for time { t . as_datetime ( ) } " )
orbit_ephems = self . astro_dog . download_parse_orbit_data ( t , skip_before_epoch = t - 2 * SECS_IN_HR )
start_time = time . monotonic ( )
if len ( orbit_ephems ) > 0 :
self . astro_dog . get_orbit_data ( t , only_predictions = True )
cloudlog . info ( f " downloaded and parsed correctly new orbits { len ( orbit_ephems ) } , Constellations: { set ( [ e . prn [ 0 ] for e in orbit_ephems ] ) } " )
cloudlog . info ( f " Done parsing orbits. Took { time . monotonic ( ) - start_time : .2f } s " )
self . astro_dog . add_ephems ( orbit_ephems , self . astro_dog . orbits )
latest_orbit = max ( orbit_ephems , key = lambda e : e . epoch ) # type: ignore
self . latest_epoch_fetched = latest_orbit . epoch
def create_measurement_msg ( meas : GNSSMeasurement ) :
def create_measurement_msg ( meas : GNSSMeasurement ) :