Laikad: Cache orbit and nav data (#24831)

* Cache orbit and nav data

* Cleanup

* Cleanup

* Use ProcessPoolExecutor to fetch orbits

* update laika repo

* Minor

* Create json de/serializers
Save cache only 1 minute at max

* Update laika repo

* Speed up json by caching json in ephemeris class

* Update laika

* Fix test

* Use constant
old-commit-hash: c3fa9151f3
taco
Gijs Koning 3 years ago committed by GitHub
parent 87b6182aac
commit 61dcb8729c
  1. 1
      common/params.cc
  2. 2
      laika_repo
  3. 64
      selfdrive/locationd/laikad.py
  4. 66
      selfdrive/locationd/test/test_laikad.py

@ -127,6 +127,7 @@ std::unordered_map<std::string, uint32_t> keys = {
{"IsTakingSnapshot", CLEAR_ON_MANAGER_START}, {"IsTakingSnapshot", CLEAR_ON_MANAGER_START},
{"IsUpdateAvailable", CLEAR_ON_MANAGER_START}, {"IsUpdateAvailable", CLEAR_ON_MANAGER_START},
{"JoystickDebugMode", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF}, {"JoystickDebugMode", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF},
{"LaikadEphemeris", PERSISTENT},
{"LastAthenaPingTime", CLEAR_ON_MANAGER_START}, {"LastAthenaPingTime", CLEAR_ON_MANAGER_START},
{"LastGPSPosition", PERSISTENT}, {"LastGPSPosition", PERSISTENT},
{"LastManagerExitReason", CLEAR_ON_MANAGER_START}, {"LastManagerExitReason", CLEAR_ON_MANAGER_START},

@ -1 +1 @@
Subproject commit 36f2621fc5348487bb2cd606c37c8c15de0e32cd Subproject commit a3a80dc4f7977b2232946e56a16770e413190818

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
import time import time
from concurrent.futures import Future, ProcessPoolExecutor from concurrent.futures import Future, ProcessPoolExecutor
from typing import List, Optional from typing import List, Optional
@ -9,9 +10,10 @@ from collections import defaultdict
from numpy.linalg import linalg from numpy.linalg import linalg
from cereal import log, messaging from cereal import log, messaging
from common.params import Params, put_nonblocking
from laika import AstroDog from laika import AstroDog
from laika.constants import SECS_IN_HR, SECS_IN_MIN from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.ephemeris import EphemerisType, convert_ublox_ephem from laika.ephemeris import Ephemeris, EphemerisType, convert_ublox_ephem
from laika.gps_time import GPSTime from laika.gps_time import GPSTime
from laika.helpers import ConstellationId from laika.helpers import ConstellationId
from laika.raw_gnss import GNSSMeasurement, calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox from laika.raw_gnss import GNSSMeasurement, calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox
@ -22,16 +24,40 @@ import common.transformations.coordinates as coord
from system.swaglog import cloudlog from system.swaglog import cloudlog
MAX_TIME_GAP = 10 MAX_TIME_GAP = 10
EPHEMERIS_CACHE = 'LaikadEphemeris'
CACHE_VERSION = 0.1
class Laikad: class Laikad:
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV),
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)): save_ephemeris=False):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types) self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True)
self.gnss_kf = GNSSKalman(GENERATED_DIR) self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.orbit_fetch_executor = ProcessPoolExecutor() self.orbit_fetch_executor = ProcessPoolExecutor()
self.orbit_fetch_future: Optional[Future] = None self.orbit_fetch_future: Optional[Future] = None
self.last_fetch_orbits_t = None self.last_fetch_orbits_t = None
self.last_cached_t = None
self.save_ephemeris = save_ephemeris
self.load_cache()
def load_cache(self):
cache = Params().get(EPHEMERIS_CACHE)
if not cache:
return
try:
cache = json.loads(cache, object_hook=deserialize_hook)
self.astro_dog.add_orbits(cache['orbits'])
self.astro_dog.add_navs(cache['nav'])
self.last_fetch_orbits_t = cache['last_fetch_orbits_t']
except json.decoder.JSONDecodeError:
cloudlog.exception("Error parsing cache")
def cache_ephemeris(self, t: GPSTime):
if self.save_ephemeris and (self.last_cached_t is None or t - self.last_cached_t > SECS_IN_MIN):
put_nonblocking(EPHEMERIS_CACHE, json.dumps(
{'version': CACHE_VERSION, 'last_fetch_orbits_t': self.last_fetch_orbits_t, 'orbits': self.astro_dog.orbits, 'nav': self.astro_dog.nav},
cls=CacheSerializer))
self.last_cached_t = t
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False): def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
if ublox_msg.which == 'measurementReport': if ublox_msg.which == 'measurementReport':
@ -83,7 +109,8 @@ class Laikad:
return dat return dat
elif ublox_msg.which == 'ephemeris': elif ublox_msg.which == 'ephemeris':
ephem = convert_ublox_ephem(ublox_msg.ephemeris) ephem = convert_ublox_ephem(ublox_msg.ephemeris)
self.astro_dog.add_navs([ephem]) self.astro_dog.add_navs({ephem.prn: [ephem]})
self.cache_ephemeris(t=ephem.epoch)
# elif ublox_msg.which == 'ionoData': # elif ublox_msg.which == 'ionoData':
# todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them. # todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
@ -101,7 +128,7 @@ class Laikad:
cloudlog.error("Gnss kalman std too far") cloudlog.error("Gnss kalman std too far")
if len(pos_fix) == 0: if len(pos_fix) == 0:
cloudlog.warning("Position fix not available when resetting kalman filter") cloudlog.info("Position fix not available when resetting kalman filter")
return return
post_est = pos_fix[0][:3].tolist() post_est = pos_fix[0][:3].tolist()
self.init_gnss_localizer(post_est) self.init_gnss_localizer(post_est)
@ -134,10 +161,11 @@ class Laikad:
self.orbit_fetch_future.result() self.orbit_fetch_future.result()
if self.orbit_fetch_future.done(): if self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result() ret = self.orbit_fetch_future.result()
self.last_fetch_orbits_t = t
if ret: if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)
self.orbit_fetch_future = None self.orbit_fetch_future = None
self.last_fetch_orbits_t = t
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
@ -193,11 +221,31 @@ def get_bearing_from_gnss(ecef_pos, ecef_vel, vel_std):
return float(np.rad2deg(bearing)), float(bearing_std) return float(np.rad2deg(bearing)), float(bearing_std)
class CacheSerializer(json.JSONEncoder):
def default(self, o):
if isinstance(o, Ephemeris):
return o.to_json()
if isinstance(o, GPSTime):
return o.__dict__
if isinstance(o, np.ndarray):
return o.tolist()
return json.JSONEncoder.default(self, o)
def deserialize_hook(dct):
if 'ephemeris' in dct:
return Ephemeris.from_json(dct)
if 'week' in dct:
return GPSTime(dct['week'], dct['tow'])
return dct
def main(): def main():
sm = messaging.SubMaster(['ubloxGnss']) sm = messaging.SubMaster(['ubloxGnss'])
pm = messaging.PubMaster(['gnssMeasurements']) pm = messaging.PubMaster(['gnssMeasurements'])
laikad = Laikad() laikad = Laikad(save_ephemeris=True)
while True: while True:
sm.update() sm.update()

@ -1,14 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import time
import unittest import unittest
from datetime import datetime from datetime import datetime
from unittest import mock from unittest import mock
from unittest.mock import Mock from unittest.mock import Mock, patch
from common.params import Params
from laika.ephemeris import EphemerisType from laika.ephemeris import EphemerisType
from laika.gps_time import GPSTime from laika.gps_time import GPSTime
from laika.helpers import ConstellationId from laika.helpers import ConstellationId, TimeRangeHolder
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox from laika.raw_gnss import GNSSMeasurement, read_raw_ublox
from selfdrive.locationd.laikad import Laikad, create_measurement_msg from selfdrive.locationd.laikad import EPHEMERIS_CACHE, Laikad, create_measurement_msg
from selfdrive.test.openpilotci import get_url from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader from tools.lib.logreader import LogReader
@ -20,12 +22,14 @@ def get_log(segs=range(0)):
return [m for m in logs if m.which() == 'ubloxGnss'] return [m for m in logs if m.which() == 'ubloxGnss']
def verify_messages(lr, laikad): def verify_messages(lr, laikad, return_one_success=False):
good_msgs = [] good_msgs = []
for m in lr: for m in lr:
msg = laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=True) msg = laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=True)
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0: if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
good_msgs.append(msg) good_msgs.append(msg)
if return_one_success:
return msg
return good_msgs return good_msgs
@ -35,6 +39,9 @@ class TestLaikad(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
cls.logs = get_log(range(1)) cls.logs = get_log(range(1))
def setUp(self):
Params().delete(EPHEMERIS_CACHE)
def test_create_msg_without_errors(self): def test_create_msg_without_errors(self):
gpstime = GPSTime.from_datetime(datetime.now()) gpstime = GPSTime.from_datetime(datetime.now())
meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.}) meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.})
@ -81,8 +88,7 @@ class TestLaikad(unittest.TestCase):
first_gps_time = self.get_first_gps_time() first_gps_time = self.get_first_gps_time()
# Pretend process has loaded the orbits on startup by using the time of the first gps message. # Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time, block=True) laikad.fetch_orbits(first_gps_time, block=True)
self.assertEqual(29, len(laikad.astro_dog.orbits.values())) self.dict_has_values(laikad.astro_dog.orbits)
self.assertGreater(min([len(v) for v in laikad.astro_dog.orbits.values()]), 0)
@unittest.skip("Use to debug live data") @unittest.skip("Use to debug live data")
def test_laika_get_orbits_now(self): def test_laika_get_orbits_now(self):
@ -109,6 +115,54 @@ class TestLaikad(unittest.TestCase):
self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0) self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0)
self.assertEqual(None, laikad.orbit_fetch_future) self.assertEqual(None, laikad.orbit_fetch_future)
def test_cache(self):
laikad = Laikad(auto_update=True, save_ephemeris=True)
first_gps_time = self.get_first_gps_time()
def wait_for_cache():
max_time = 2
while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1)
max_time -= 0.1
if max_time == 0:
self.fail("Cache has not been written after 2 seconds")
# Test cache with no ephemeris
laikad.cache_ephemeris(t=GPSTime(0, 0))
wait_for_cache()
Params().delete(EPHEMERIS_CACHE)
laikad.astro_dog.get_navs(first_gps_time)
laikad.fetch_orbits(first_gps_time, block=True)
# Wait for cache to save
wait_for_cache()
# Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV)
# Verify orbits and nav are loaded from cache
self.dict_has_values(laikad.astro_dog.orbits)
self.dict_has_values(laikad.astro_dog.nav)
# Verify cache is working for only nav by running a segment
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
# Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
laikad.astro_dog.orbit_fetched_times = TimeRangeHolder()
laikad.fetch_orbits(first_gps_time, block=False)
mock_method.assert_not_called()
# Verify cache is working for only orbits by running a segment
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
# Verify orbit data is not downloaded
mock_method.assert_not_called()
def dict_has_values(self, dct):
self.assertGreater(len(dct), 0)
self.assertGreater(min([len(v) for v in dct.values()]), 0)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

Loading…
Cancel
Save