Laikad: Use clocks for faster fetching orbits (#25060)

* Use clocks msg to for first fetch of orbits. Which is sent earlier than ublox msgs

* refactor last_fetch_orbits

* Add comment.
Add test

* increase timeout

* Add clocks to process replay
old-commit-hash: eaf7eb4278
vw-mqb-aeb
Gijs Koning 3 years ago committed by GitHub
parent 5751d166bb
commit c64a61e652
  1. 39
      selfdrive/locationd/laikad.py
  2. 26
      selfdrive/locationd/test/test_laikad.py
  3. 4
      selfdrive/test/process_replay/process_replay.py

@ -5,6 +5,7 @@ import os
import time import time
from collections import defaultdict from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime
from enum import IntEnum from enum import IntEnum
from typing import List, Optional from typing import List, Optional
@ -13,7 +14,7 @@ import numpy as np
from cereal import log, messaging from cereal import log, messaging
from common.params import Params, put_nonblocking from common.params import Params, put_nonblocking
from laika import AstroDog from laika import AstroDog
from laika.constants import SECS_IN_MIN from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.ephemeris import Ephemeris, EphemerisType, convert_ublox_ephem from laika.ephemeris import Ephemeris, EphemerisType, convert_ublox_ephem
from laika.gps_time import GPSTime from laika.gps_time import GPSTime
from laika.helpers import ConstellationId from laika.helpers import ConstellationId
@ -30,7 +31,8 @@ CACHE_VERSION = 0.1
class Laikad: class Laikad:
def __init__(self, valid_const=("GPS", "GLONASS"), auto_fetch_orbits=True, auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV), def __init__(self, valid_const=("GPS", "GLONASS"), auto_fetch_orbits=True, auto_update=False,
valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV),
save_ephemeris=False): save_ephemeris=False):
""" """
valid_const: GNSS constellation which can be used valid_const: GNSS constellation which can be used
@ -47,6 +49,7 @@ class Laikad:
self.orbit_fetch_future: Optional[Future] = None self.orbit_fetch_future: Optional[Future] = None
self.last_fetch_orbits_t = None self.last_fetch_orbits_t = None
self.got_first_ublox_msg = False
self.last_cached_t = None self.last_cached_t = None
self.save_ephemeris = save_ephemeris self.save_ephemeris = save_ephemeris
self.load_cache() self.load_cache()
@ -72,8 +75,9 @@ class Laikad:
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
cloudlog.exception("Error parsing cache") cloudlog.exception("Error parsing cache")
timestamp = self.last_fetch_orbits_t.as_datetime() if self.last_fetch_orbits_t is not None else 'Nan' timestamp = self.last_fetch_orbits_t.as_datetime() if self.last_fetch_orbits_t is not None else 'Nan'
cloudlog.debug(f"Loaded nav and orbits cache with timestamp: {timestamp}. Unique orbit and nav sats: {list(cache['orbits'].keys())} {list(cache['nav'].keys())} " + cloudlog.debug(
f"Total: {sum([len(v) for v in cache['orbits']])} and {sum([len(v) for v in cache['nav']])}") f"Loaded nav and orbits cache with timestamp: {timestamp}. Unique orbit and nav sats: {list(cache['orbits'].keys())} {list(cache['nav'].keys())} " +
f"Total: {sum([len(v) for v in cache['orbits']])} and {sum([len(v) for v in cache['nav']])}")
def cache_ephemeris(self, t: GPSTime): def cache_ephemeris(self, t: GPSTime):
if self.save_ephemeris and (self.last_cached_t is None or t - self.last_cached_t > SECS_IN_MIN): if self.save_ephemeris and (self.last_cached_t is None or t - self.last_cached_t > SECS_IN_MIN):
@ -98,9 +102,10 @@ class Laikad:
t = ublox_mono_time * 1e-9 t = ublox_mono_time * 1e-9
report = ublox_msg.measurementReport report = ublox_msg.measurementReport
if report.gpsWeek > 0: if report.gpsWeek > 0:
self.got_first_ublox_msg = True
latest_msg_t = GPSTime(report.gpsWeek, report.rcvTow) latest_msg_t = GPSTime(report.gpsWeek, report.rcvTow)
if self.auto_fetch_orbits: if self.auto_fetch_orbits:
self.fetch_orbits(latest_msg_t + SECS_IN_MIN, block) self.fetch_orbits(latest_msg_t, block)
new_meas = read_raw_ublox(report) new_meas = read_raw_ublox(report)
# Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites # Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites
@ -174,24 +179,26 @@ class Laikad:
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag) self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
def fetch_orbits(self, t: GPSTime, block): def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_MIN): # Download new orbits if 1 hour of orbits data left
if t + SECS_IN_HR not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or abs(t - self.last_fetch_orbits_t) > SECS_IN_MIN):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
ret = None ret = None
if block: if block: # Used for testing purposes
ret = get_orbit_data(t, *astro_dog_vars) ret = get_orbit_data(t, *astro_dog_vars)
elif self.orbit_fetch_future is None: elif self.orbit_fetch_future is None:
self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1) self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
elif self.orbit_fetch_future.done(): elif self.orbit_fetch_future.done():
self.last_fetch_orbits_t = t
ret = self.orbit_fetch_future.result() ret = self.orbit_fetch_future.result()
self.orbit_fetch_executor = self.orbit_fetch_future = None self.orbit_fetch_executor = self.orbit_fetch_future = None
if ret is not None: if ret is not None:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret if ret[0] is None:
self.cache_ephemeris(t=t) self.last_fetch_orbits_t = ret[2]
else:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times, self.last_fetch_orbits_t = ret
self.cache_ephemeris(t=t)
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
@ -201,9 +208,10 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
try: try:
astro_dog.get_orbit_data(t, only_predictions=True) astro_dog.get_orbit_data(t, only_predictions=True)
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s") cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
return astro_dog.orbits, astro_dog.orbit_fetched_times return astro_dog.orbits, astro_dog.orbit_fetched_times, t
except (RuntimeError, ValueError, IOError) as e: except (RuntimeError, ValueError, IOError) as e:
cloudlog.warning(f"No orbit data found or parsing failure: {e}") cloudlog.warning(f"No orbit data found or parsing failure: {e}")
return None, None, t
def create_measurement_msg(meas: GNSSMeasurement): def create_measurement_msg(meas: GNSSMeasurement):
@ -284,7 +292,7 @@ class EphemerisSourceType(IntEnum):
def main(sm=None, pm=None): def main(sm=None, pm=None):
if sm is None: if sm is None:
sm = messaging.SubMaster(['ubloxGnss']) sm = messaging.SubMaster(['ubloxGnss', 'clocks'])
if pm is None: if pm is None:
pm = messaging.PubMaster(['gnssMeasurements']) pm = messaging.PubMaster(['gnssMeasurements'])
@ -299,6 +307,11 @@ def main(sm=None, pm=None):
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay) msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay)
if msg is not None: if msg is not None:
pm.send('gnssMeasurements', msg) pm.send('gnssMeasurements', msg)
if not laikad.got_first_ublox_msg and sm.updated['clocks']:
clocks_msg = sm['clocks']
t = GPSTime.from_datetime(datetime.utcfromtimestamp(clocks_msg.wallTimeNanos * 1E-9))
if laikad.auto_fetch_orbits:
laikad.fetch_orbits(t, block=replay)
if __name__ == "__main__": if __name__ == "__main__":

@ -67,7 +67,7 @@ class TestLaikad(unittest.TestCase):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1)) gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad() laikad = Laikad()
laikad.fetch_orbits(gpstime, block=False) laikad.fetch_orbits(gpstime, block=False)
laikad.orbit_fetch_future.result(5) laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad: # Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime, block=False) laikad.fetch_orbits(gpstime, block=False)
@ -75,7 +75,7 @@ class TestLaikad(unittest.TestCase):
self.assertIsNotNone(ephem) self.assertIsNotNone(ephem)
laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False) laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(5) laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad: # Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False) laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False)
@ -83,6 +83,28 @@ class TestLaikad(unittest.TestCase):
self.assertIsNotNone(ephem) self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2) self.assertNotEqual(ephem, ephem2)
def test_fetch_orbits_with_wrong_clocks(self):
laikad = Laikad()
def check_has_orbits():
self.assertGreater(len(laikad.astro_dog.orbits), 0)
ephem = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
real_current_time = GPSTime.from_datetime(datetime(2021, month=3, day=1))
wrong_future_clock_time = real_current_time + SECS_IN_DAY
laikad.fetch_orbits(wrong_future_clock_time, block=True)
check_has_orbits()
self.assertEqual(laikad.last_fetch_orbits_t, wrong_future_clock_time)
# Test fetching orbits with earlier time
assert real_current_time < laikad.last_fetch_orbits_t
laikad.astro_dog.orbits = {}
laikad.fetch_orbits(real_current_time, block=True)
check_has_orbits()
self.assertEqual(laikad.last_fetch_orbits_t, real_current_time)
def test_ephemeris_source_in_msg(self): def test_ephemeris_source_in_msg(self):
data_mock = defaultdict(str) data_mock = defaultdict(str)
data_mock['sv_id'] = 1 data_mock['sv_id'] = 1

@ -239,7 +239,7 @@ def ublox_rcv_callback(msg):
def laika_rcv_callback(msg, CP, cfg, fsm): def laika_rcv_callback(msg, CP, cfg, fsm):
if msg.ubloxGnss.which() == "measurementReport": if msg.which() == 'ubloxGnss' and msg.ubloxGnss.which() == "measurementReport":
return ["gnssMeasurements"], True return ["gnssMeasurements"], True
else: else:
return [], True return [], True
@ -352,6 +352,7 @@ CONFIGS = [
subtest_name="Offline", subtest_name="Offline",
pub_sub={ pub_sub={
"ubloxGnss": ["gnssMeasurements"], "ubloxGnss": ["gnssMeasurements"],
"clocks": []
}, },
ignore=["logMonoTime"], ignore=["logMonoTime"],
init_callback=get_car_params, init_callback=get_car_params,
@ -364,6 +365,7 @@ CONFIGS = [
proc_name="laikad", proc_name="laikad",
pub_sub={ pub_sub={
"ubloxGnss": ["gnssMeasurements"], "ubloxGnss": ["gnssMeasurements"],
"clocks": []
}, },
ignore=["logMonoTime"], ignore=["logMonoTime"],
init_callback=get_car_params, init_callback=get_car_params,

Loading…
Cancel
Save