From e7234e22b4d11cc8f349d02855df9b3414096c0d Mon Sep 17 00:00:00 2001 From: Gijs Koning Date: Tue, 7 Jun 2022 20:55:39 +0200 Subject: [PATCH] Laikad: Use process for parsing orbits (#24769) * Use Process instead of Thread to fetch orbits * small refactor * Cleanup --- selfdrive/locationd/laikad.py | 66 ++++++++++++++----------- selfdrive/locationd/test/test_laikad.py | 10 ++-- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index d8e32005f8..f0c52e7e03 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -import threading import time -from typing import List +from multiprocessing import Process, Queue +from typing import List, Optional import numpy as np from collections import defaultdict @@ -27,14 +27,17 @@ class Laikad: def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)): self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types) self.gnss_kf = GNSSKalman(GENERATED_DIR) - self.latest_time_msg = None + self.orbit_p: Optional[Process] = None + self.orbit_q = Queue() def process_ublox_msg(self, ublox_msg, ublox_mono_time: int): if ublox_msg.which == 'measurementReport': report = ublox_msg.measurementReport - new_meas = read_raw_ublox(report) if report.gpsWeek > 0: - self.latest_time_msg = GPSTime(report.gpsWeek, report.rcvTow) + latest_msg_t = GPSTime(report.gpsWeek, report.rcvTow) + self.fetch_orbits(latest_msg_t + SECS_IN_MIN, block=False) + new_meas = read_raw_ublox(report) + measurements = process_measurements(new_meas, self.astro_dog) pos_fix = calc_pos_fix(measurements, min_measurements=4) # To get a position fix a minimum of 5 measurements are needed. @@ -104,18 +107,28 @@ class Laikad: self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag) - def orbit_thread(self, end_event: threading.Event): - while not end_event.is_set(): - if self.latest_time_msg: - self.fetch_orbits(self.latest_time_msg + SECS_IN_MIN) - time.sleep(0.1) + def get_orbit_data(self, t: GPSTime, queue): + cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") + start_time = time.monotonic() + self.astro_dog.get_orbit_data(t, only_predictions=True) + cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s") + queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times)) - def fetch_orbits(self, t: GPSTime): + def fetch_orbits(self, t: GPSTime, block): if t not in self.astro_dog.orbit_fetched_times: - cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") - start_time = time.monotonic() - self.astro_dog.get_orbit_data(t, only_predictions=True) - cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s") + if self.orbit_p is None: + self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q)) + self.orbit_p.start() + ret = None + if block: + ret = self.orbit_q.get(block=True) + elif not self.orbit_q.empty(): + ret = self.orbit_q.get() + + if ret: + self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret + self.orbit_p.join() + self.orbit_p = None def create_measurement_msg(meas: GNSSMeasurement): @@ -162,21 +175,14 @@ def main(): pm = messaging.PubMaster(['gnssMeasurements']) laikad = Laikad() - - end_event = threading.Event() - threading.Thread(target=laikad.orbit_thread, args=(end_event,)).start() - try: - while not end_event.is_set(): - sm.update() - - if sm.updated['ubloxGnss']: - ublox_msg = sm['ubloxGnss'] - msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss']) - if msg is not None: - pm.send('gnssMeasurements', msg) - except (KeyboardInterrupt, SystemExit): - end_event.set() - raise + while True: + sm.update() + + if sm.updated['ubloxGnss']: + ublox_msg = sm['ubloxGnss'] + msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss']) + if msg is not None: + pm.send('gnssMeasurements', msg) if __name__ == "__main__": diff --git a/selfdrive/locationd/test/test_laikad.py b/selfdrive/locationd/test/test_laikad.py index 8cb0773b37..630a7525eb 100755 --- a/selfdrive/locationd/test/test_laikad.py +++ b/selfdrive/locationd/test/test_laikad.py @@ -83,18 +83,14 @@ class TestLaikad(unittest.TestCase): if len(new_meas) != 0: first_gps_time = new_meas[0].recv_time break - # Pretend thread has loaded the orbits on startup by using the time of the first gps message. - laikad.fetch_orbits(first_gps_time) + # Pretend process has loaded the orbits on startup by using the time of the first gps message. + laikad.fetch_orbits(first_gps_time, block=True) self.assertEqual(29, len(laikad.astro_dog.orbits.keys())) - correct_msgs = verify_messages(self.logs, laikad) - correct_msgs_expected = 560 - self.assertEqual(correct_msgs_expected, len(correct_msgs)) - self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid])) @unittest.skip("Use to debug live data") def test_laika_get_orbits_now(self): laikad = Laikad(auto_update=False) - laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow())) + laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True) prn = "G01" self.assertLess(0, len(laikad.astro_dog.orbits[prn])) prn = "R01"