@ -31,7 +31,14 @@ CACHE_VERSION = 0.1
class Laikad :
def __init__ ( self , valid_const = ( " GPS " , " GLONASS " ) , auto_fetch_orbits = True , auto_update = False , valid_ephem_types = ( EphemerisType . ULTRA_RAPID_ORBIT , EphemerisType . NAV ) ,
save_ephemeris = False , last_known_position = None ) :
save_ephemeris = False ) :
"""
valid_const : GNSS constellation which can be used
auto_fetch_orbits : If true fetch orbits from internet when needed
auto_update : If true download AstroDog will download all files needed . This can be ephemeris or correction data like ionosphere .
valid_ephem_types : Valid ephemeris types to be used by AstroDog
save_ephemeris : If true saves and loads nav and orbit ephemeris to cache .
"""
self . astro_dog = AstroDog ( valid_const = valid_const , auto_update = auto_update , valid_ephem_types = valid_ephem_types , clear_old_ephemeris = True )
self . gnss_kf = GNSSKalman ( GENERATED_DIR , cython = True )
@ -45,7 +52,7 @@ class Laikad:
self . load_cache ( )
self . posfix_functions = { constellation : get_posfix_sympy_fun ( constellation ) for constellation in ( ConstellationId . GPS , ConstellationId . GLONASS ) }
self . last_pos_fix = last_known_position if last_known_position is not None else [ ]
self . last_pos_fix = [ ]
self . last_pos_residual = [ ]
self . last_pos_fix_t = None
@ -64,12 +71,16 @@ class Laikad:
self . last_fetch_orbits_t = cache [ ' last_fetch_orbits_t ' ]
except json . decoder . JSONDecodeError :
cloudlog . exception ( " Error parsing cache " )
timestamp = self . last_fetch_orbits_t . as_datetime ( ) if self . last_fetch_orbits_t is not None else ' Nan '
cloudlog . debug ( f " Loaded nav and orbits cache with timestamp: { timestamp } . Unique orbit and nav sats: { list ( cache [ ' orbits ' ] . keys ( ) ) } { list ( cache [ ' nav ' ] . keys ( ) ) } " +
f " Total: { sum ( [ len ( v ) for v in cache [ ' orbits ' ] ] ) } and { sum ( [ len ( v ) for v in cache [ ' nav ' ] ] ) } " )
def cache_ephemeris ( self , t : GPSTime ) :
if self . save_ephemeris and ( self . last_cached_t is None or t - self . last_cached_t > SECS_IN_MIN ) :
put_nonblocking ( EPHEMERIS_CACHE , json . dumps (
{ ' version ' : CACHE_VERSION , ' last_fetch_orbits_t ' : self . last_fetch_orbits_t , ' orbits ' : self . astro_dog . orbits , ' nav ' : self . astro_dog . nav } ,
cls = CacheSerializer ) )
cloudlog . debug ( " Cache saved " )
self . last_cached_t = t
def get_est_pos ( self , t , processed_measurements ) :
@ -130,9 +141,8 @@ class Laikad:
# Check time and outputs are valid
valid = self . kf_valid ( t )
if not all ( valid ) :
if not valid [ 0 ] :
cloudlog . info ( " Kalman filter uninitialized " )
elif not valid [ 1 ] :
if not valid [ 1 ] :
cloudlog . error ( " Time gap of over 10s detected, gnss kalman reset " )
elif not valid [ 2 ] :
cloudlog . error ( " Gnss kalman filter state is nan " )
@ -140,7 +150,6 @@ class Laikad:
cloudlog . info ( f " Reset kalman filter with { est_pos } " )
self . init_gnss_localizer ( est_pos )
else :
cloudlog . info ( " Could not reset kalman filter " )
return
if len ( measurements ) > 0 :
kf_add_observations ( self . gnss_kf , t , measurements )
@ -189,8 +198,8 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
try :
astro_dog . get_orbit_data ( t , only_predictions = True )
data = ( astro_dog . orbits , astro_dog . orbit_fetched_times )
except RuntimeError as e :
cloudlog . warning ( f " No orbit data found. { e } " )
except ( RuntimeError , ValueError , IOError ) as e :
cloudlog . warning ( f " No orbit data found or parsing failure: { e } " )
cloudlog . info ( f " Done parsing orbits. Took { time . monotonic ( ) - start_time : .1f } s " )
return data
@ -241,7 +250,7 @@ def kf_add_observations(gnss_kf: GNSSKalman, t: float, measurements: List[GNSSMe
ekf_data [ ObservationKind . PSEUDORANGE_RATE_GPS ] = ekf_data [ ObservationKind . PSEUDORANGE_GPS ]
ekf_data [ ObservationKind . PSEUDORANGE_RATE_GLONASS ] = ekf_data [ ObservationKind . PSEUDORANGE_GLONASS ]
for kind , data in ekf_data . items ( ) :
if len ( data ) > 0 :
if len ( data ) > 0 :
gnss_kf . predict_and_observe ( t , kind , data )
@ -278,7 +287,6 @@ def main(sm=None, pm=None):
pm = messaging . PubMaster ( [ ' gnssMeasurements ' ] )
replay = " REPLAY " in os . environ
# todo get last_known_position
use_internet = " LAIKAD_NO_INTERNET " not in os . environ
laikad = Laikad ( save_ephemeris = not replay , auto_fetch_orbits = use_internet )
while True :