Laikad: process executor to fetch orbits (#24843)

* Use ProcessPoolExecutor to fetch orbits

* update laika repo

* Minor
old-commit-hash: a2d2378ee1
taco
Gijs Koning 3 years ago committed by GitHub
parent 38f6e2726b
commit e03eddea87
  1. 2
      laika_repo
  2. 64
      selfdrive/locationd/laikad.py
  3. 33
      selfdrive/locationd/test/test_laikad.py

@ -1 +1 @@
Subproject commit d87194613455b42af19ff2b5a3f7d1cae5852885
Subproject commit 36f2621fc5348487bb2cd606c37c8c15de0e32cd

@ -1,6 +1,6 @@
#!/usr/bin/env python3
import time
from multiprocessing import Process, Queue
from concurrent.futures import Future, ProcessPoolExecutor
from typing import List, Optional
import numpy as np
@ -10,7 +10,7 @@ from numpy.linalg import linalg
from cereal import log, messaging
from laika import AstroDog
from laika.constants import SECS_IN_MIN
from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.ephemeris import EphemerisType, convert_ublox_ephem
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
@ -29,8 +29,9 @@ class Laikad:
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.orbit_p: Optional[Process] = None
self.orbit_q = Queue()
self.orbit_fetch_executor = ProcessPoolExecutor()
self.orbit_fetch_future: Optional[Future] = None
self.last_fetch_orbits_t = None
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
if ublox_msg.which == 'measurementReport':
@ -82,7 +83,7 @@ class Laikad:
return dat
elif ublox_msg.which == 'ephemeris':
ephem = convert_ublox_ephem(ublox_msg.ephemeris)
self.astro_dog.add_ephems([ephem], self.astro_dog.nav)
self.astro_dog.add_navs([ephem])
# elif ublox_msg.which == 'ionoData':
# todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
@ -100,7 +101,7 @@ class Laikad:
cloudlog.error("Gnss kalman std too far")
if len(pos_fix) == 0:
cloudlog.error("Position fix not available when resetting kalman filter")
cloudlog.warning("Position fix not available when resetting kalman filter")
return
post_est = pos_fix[0][:3].tolist()
self.init_gnss_localizer(post_est)
@ -124,36 +125,33 @@ class Laikad:
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
def get_orbit_data(self, t: GPSTime, queue):
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
start_time = time.monotonic()
try:
self.astro_dog.get_orbit_data(t, only_predictions=True)
except RuntimeError as e:
cloudlog.info(f"No orbit data found. {e}")
return
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
if queue is not None:
queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times))
def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times:
if block:
self.get_orbit_data(t, None)
return
if self.orbit_p is None:
self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q))
self.orbit_p.start()
if not self.orbit_q.empty():
ret = self.orbit_q.get()
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
if self.orbit_fetch_future is None:
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
if block:
self.orbit_fetch_future.result()
if self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result()
if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.orbit_p.join()
self.orbit_p = None
def __del__(self):
if self.orbit_p is not None:
self.orbit_p.kill()
self.orbit_fetch_future = None
self.last_fetch_orbits_t = t
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
start_time = time.monotonic()
data = None
try:
astro_dog.get_orbit_data(t, only_predictions=True)
data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
except RuntimeError as e:
cloudlog.info(f"No orbit data found. {e}")
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
return data
def create_measurement_msg(meas: GNSSMeasurement):

@ -69,29 +69,46 @@ class TestLaikad(unittest.TestCase):
self.assertEqual(256, len(correct_msgs))
self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_get_orbits(self):
laikad = Laikad(auto_update=False)
first_gps_time = None
def get_first_gps_time(self):
for m in self.logs:
if m.ubloxGnss.which == 'measurementReport':
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
if len(new_meas) != 0:
first_gps_time = new_meas[0].recv_time
break
return new_meas[0].recv_time
def test_laika_get_orbits(self):
laikad = Laikad(auto_update=False)
first_gps_time = self.get_first_gps_time()
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time, block=True)
self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
self.assertEqual(29, len(laikad.astro_dog.orbits.values()))
self.assertGreater(min([len(v) for v in laikad.astro_dog.orbits.values()]), 0)
@unittest.skip("Use to debug live data")
def test_laika_get_orbits_now(self):
laikad = Laikad(auto_update=False)
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
prn = "G01"
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
prn = "R01"
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
print(min(laikad.astro_dog.orbits[prn], key=lambda e: e.epoch).epoch.as_datetime())
def test_get_orbits_in_process(self):
laikad = Laikad(auto_update=False)
has_orbits = False
for m in self.logs:
laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=False)
if laikad.orbit_fetch_future is not None:
laikad.orbit_fetch_future.result()
vals = laikad.astro_dog.orbits.values()
has_orbits = len(vals) > 0 and max([len(v) for v in vals]) > 0
if has_orbits:
break
self.assertTrue(has_orbits)
self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0)
self.assertEqual(None, laikad.orbit_fetch_future)
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save