From e03eddea87bfbef2aaf24fd630e2c6c521cf8189 Mon Sep 17 00:00:00 2001 From: Gijs Koning Date: Mon, 13 Jun 2022 14:02:31 +0200 Subject: [PATCH] Laikad: process executor to fetch orbits (#24843) * Use ProcessPoolExecutor to fetch orbits * update laika repo * Minor old-commit-hash: a2d2378ee147f2db59aecd3401677cb2427d1ced --- laika_repo | 2 +- selfdrive/locationd/laikad.py | 64 ++++++++++++------------- selfdrive/locationd/test/test_laikad.py | 33 +++++++++---- 3 files changed, 57 insertions(+), 42 deletions(-) diff --git a/laika_repo b/laika_repo index d871946134..36f2621fc5 160000 --- a/laika_repo +++ b/laika_repo @@ -1 +1 @@ -Subproject commit d87194613455b42af19ff2b5a3f7d1cae5852885 +Subproject commit 36f2621fc5348487bb2cd606c37c8c15de0e32cd diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index 42c4156795..33e41398a0 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 import time -from multiprocessing import Process, Queue +from concurrent.futures import Future, ProcessPoolExecutor from typing import List, Optional import numpy as np @@ -10,7 +10,7 @@ from numpy.linalg import linalg from cereal import log, messaging from laika import AstroDog -from laika.constants import SECS_IN_MIN +from laika.constants import SECS_IN_HR, SECS_IN_MIN from laika.ephemeris import EphemerisType, convert_ublox_ephem from laika.gps_time import GPSTime from laika.helpers import ConstellationId @@ -29,8 +29,9 @@ class Laikad: def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)): self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types) self.gnss_kf = GNSSKalman(GENERATED_DIR) - self.orbit_p: Optional[Process] = None - self.orbit_q = Queue() + self.orbit_fetch_executor = ProcessPoolExecutor() + self.orbit_fetch_future: Optional[Future] = None + self.last_fetch_orbits_t = None def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False): if ublox_msg.which == 'measurementReport': @@ -82,7 +83,7 @@ class Laikad: return dat elif ublox_msg.which == 'ephemeris': ephem = convert_ublox_ephem(ublox_msg.ephemeris) - self.astro_dog.add_ephems([ephem], self.astro_dog.nav) + self.astro_dog.add_navs([ephem]) # elif ublox_msg.which == 'ionoData': # todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them. @@ -100,7 +101,7 @@ class Laikad: cloudlog.error("Gnss kalman std too far") if len(pos_fix) == 0: - cloudlog.error("Position fix not available when resetting kalman filter") + cloudlog.warning("Position fix not available when resetting kalman filter") return post_est = pos_fix[0][:3].tolist() self.init_gnss_localizer(post_est) @@ -124,36 +125,33 @@ class Laikad: self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag) - def get_orbit_data(self, t: GPSTime, queue): - cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") - start_time = time.monotonic() - try: - self.astro_dog.get_orbit_data(t, only_predictions=True) - except RuntimeError as e: - cloudlog.info(f"No orbit data found. {e}") - return - cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s") - if queue is not None: - queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times)) - def fetch_orbits(self, t: GPSTime, block): - if t not in self.astro_dog.orbit_fetched_times: - if block: - self.get_orbit_data(t, None) - return - if self.orbit_p is None: - self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q)) - self.orbit_p.start() - if not self.orbit_q.empty(): - ret = self.orbit_q.get() + if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR): + astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types + if self.orbit_fetch_future is None: + self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) + if block: + self.orbit_fetch_future.result() + if self.orbit_fetch_future.done(): + ret = self.orbit_fetch_future.result() if ret: self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret - self.orbit_p.join() - self.orbit_p = None - - def __del__(self): - if self.orbit_p is not None: - self.orbit_p.kill() + self.orbit_fetch_future = None + self.last_fetch_orbits_t = t + + +def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): + astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types) + cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") + start_time = time.monotonic() + data = None + try: + astro_dog.get_orbit_data(t, only_predictions=True) + data = (astro_dog.orbits, astro_dog.orbit_fetched_times) + except RuntimeError as e: + cloudlog.info(f"No orbit data found. {e}") + cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s") + return data def create_measurement_msg(meas: GNSSMeasurement): diff --git a/selfdrive/locationd/test/test_laikad.py b/selfdrive/locationd/test/test_laikad.py index 7a8c1ecb13..7dc803d799 100755 --- a/selfdrive/locationd/test/test_laikad.py +++ b/selfdrive/locationd/test/test_laikad.py @@ -69,29 +69,46 @@ class TestLaikad(unittest.TestCase): self.assertEqual(256, len(correct_msgs)) self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid])) - def test_laika_get_orbits(self): - laikad = Laikad(auto_update=False) - first_gps_time = None + def get_first_gps_time(self): for m in self.logs: if m.ubloxGnss.which == 'measurementReport': new_meas = read_raw_ublox(m.ubloxGnss.measurementReport) if len(new_meas) != 0: - first_gps_time = new_meas[0].recv_time - break + return new_meas[0].recv_time + + def test_laika_get_orbits(self): + laikad = Laikad(auto_update=False) + first_gps_time = self.get_first_gps_time() # Pretend process has loaded the orbits on startup by using the time of the first gps message. laikad.fetch_orbits(first_gps_time, block=True) - self.assertEqual(29, len(laikad.astro_dog.orbits.keys())) + self.assertEqual(29, len(laikad.astro_dog.orbits.values())) + self.assertGreater(min([len(v) for v in laikad.astro_dog.orbits.values()]), 0) @unittest.skip("Use to debug live data") def test_laika_get_orbits_now(self): laikad = Laikad(auto_update=False) laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True) prn = "G01" - self.assertLess(0, len(laikad.astro_dog.orbits[prn])) + self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0) prn = "R01" - self.assertLess(0, len(laikad.astro_dog.orbits[prn])) + self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0) print(min(laikad.astro_dog.orbits[prn], key=lambda e: e.epoch).epoch.as_datetime()) + def test_get_orbits_in_process(self): + laikad = Laikad(auto_update=False) + has_orbits = False + for m in self.logs: + laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=False) + if laikad.orbit_fetch_future is not None: + laikad.orbit_fetch_future.result() + vals = laikad.astro_dog.orbits.values() + has_orbits = len(vals) > 0 and max([len(v) for v in vals]) > 0 + if has_orbits: + break + self.assertTrue(has_orbits) + self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0) + self.assertEqual(None, laikad.orbit_fetch_future) + if __name__ == "__main__": unittest.main()