From 0071d28b7bf8e2818755e23353e29549a2b5130d Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Tue, 28 Jun 2022 13:43:15 +0200 Subject: [PATCH] add laikad to process replay (#24889) * merge * Fix closing process executor after fetching orbits * cleanup * Add ref commit and revert test_processes hack * Fix * Fix ref * Fix test * Temp * Temp * Trying * Trying * Cleanup and change test * add ref commit * remove print * fix test getting stuck * cleanup fetch_orbits Co-authored-by: Gijs Koning old-commit-hash: 3823f55476b56fd3db0afe686b1d2f3d508817bc --- selfdrive/locationd/laikad.py | 50 ++++++++++++------- selfdrive/locationd/test/test_laikad.py | 27 ++++++++-- selfdrive/test/process_replay/README.md | 3 +- .../test/process_replay/process_replay.py | 18 +++++++ selfdrive/test/process_replay/ref_commit | 2 +- selfdrive/test/profiling/profiler.py | 3 ++ 6 files changed, 81 insertions(+), 22 deletions(-) diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index ba665b7530..60028b637a 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import json +import os import time from collections import defaultdict from concurrent.futures import Future, ProcessPoolExecutor @@ -32,8 +33,10 @@ class Laikad: save_ephemeris=False, last_known_position=None): self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True) self.gnss_kf = GNSSKalman(GENERATED_DIR) - self.orbit_fetch_executor = ProcessPoolExecutor() + + self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None self.orbit_fetch_future: Optional[Future] = None + self.last_fetch_orbits_t = None self.last_cached_t = None self.save_ephemeris = save_ephemeris @@ -44,9 +47,13 @@ class Laikad: self.last_pos_fix_t = None def load_cache(self): + if not self.save_ephemeris: + return + cache = Params().get(EPHEMERIS_CACHE) if not cache: return + try: cache = json.loads(cache, object_hook=deserialize_hook) self.astro_dog.add_orbits(cache['orbits']) @@ -71,7 +78,7 @@ class Laikad: self.last_pos_residual = pos_fix_residual self.last_pos_fix_t = t return self.last_pos_fix - + def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False): if ublox_msg.which == 'measurementReport': t = ublox_mono_time * 1e-9 @@ -152,17 +159,22 @@ class Laikad: def fetch_orbits(self, t: GPSTime, block): if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR): astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types - if self.orbit_fetch_future is None: + + ret = None + + if block: + ret = get_orbit_data(t, *astro_dog_vars) + elif self.orbit_fetch_future is None: + self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1) self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) - if block: - self.orbit_fetch_future.result() - if self.orbit_fetch_future.done(): - ret = self.orbit_fetch_future.result() + elif self.orbit_fetch_future.done(): self.last_fetch_orbits_t = t - if ret: - self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret - self.cache_ephemeris(t=t) - self.orbit_fetch_future = None + ret = self.orbit_fetch_future.result() + self.orbit_fetch_executor = self.orbit_fetch_future = None + + if ret is not None: + self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret + self.cache_ephemeris(t=t) def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): @@ -174,7 +186,7 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): astro_dog.get_orbit_data(t, only_predictions=True) data = (astro_dog.orbits, astro_dog.orbit_fetched_times) except RuntimeError as e: - cloudlog.info(f"No orbit data found. {e}") + cloudlog.warning(f"No orbit data found. {e}") cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s") return data @@ -255,17 +267,21 @@ class EphemerisSourceType(IntEnum): glonassIacUltraRapid = 2 -def main(): - sm = messaging.SubMaster(['ubloxGnss']) - pm = messaging.PubMaster(['gnssMeasurements']) +def main(sm=None, pm=None): + if sm is None: + sm = messaging.SubMaster(['ubloxGnss']) + if pm is None: + pm = messaging.PubMaster(['gnssMeasurements']) + + replay = "REPLAY" in os.environ # todo get last_known_position - laikad = Laikad(save_ephemeris=True) + laikad = Laikad(save_ephemeris=not replay) while True: sm.update() if sm.updated['ubloxGnss']: ublox_msg = sm['ubloxGnss'] - msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss']) + msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay) if msg is not None: pm.send('gnssMeasurements', msg) diff --git a/selfdrive/locationd/test/test_laikad.py b/selfdrive/locationd/test/test_laikad.py index 3a72303b0c..c353da9624 100755 --- a/selfdrive/locationd/test/test_laikad.py +++ b/selfdrive/locationd/test/test_laikad.py @@ -7,6 +7,7 @@ from unittest import mock from unittest.mock import Mock, patch from common.params import Params +from laika.constants import SECS_IN_DAY from laika.ephemeris import EphemerisType, GPSEphemeris from laika.gps_time import GPSTime from laika.helpers import ConstellationId, TimeRangeHolder @@ -62,6 +63,26 @@ class TestLaikad(unittest.TestCase): def setUp(self): Params().delete(EPHEMERIS_CACHE) + def test_fetch_orbits_non_blocking(self): + gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1)) + laikad = Laikad() + laikad.fetch_orbits(gpstime, block=False) + laikad.orbit_fetch_future.result(5) + # Get results and save orbits to laikad: + laikad.fetch_orbits(gpstime, block=False) + + ephem = laikad.astro_dog.orbits['G01'][0] + self.assertIsNotNone(ephem) + + laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False) + laikad.orbit_fetch_future.result(5) + # Get results and save orbits to laikad: + laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False) + + ephem2 = laikad.astro_dog.orbits['G01'][0] + self.assertIsNotNone(ephem) + self.assertNotEqual(ephem, ephem2) + def test_ephemeris_source_in_msg(self): data_mock = defaultdict(str) data_mock['sv_id'] = 1 @@ -155,7 +176,7 @@ class TestLaikad(unittest.TestCase): while Params().get(EPHEMERIS_CACHE) is None: time.sleep(0.1) max_time -= 0.1 - if max_time == 0: + if max_time < 0: self.fail("Cache has not been written after 2 seconds") # Test cache with no ephemeris @@ -170,7 +191,7 @@ class TestLaikad(unittest.TestCase): wait_for_cache() # Check both nav and orbits separate - laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV) + laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, save_ephemeris=True) # Verify orbits and nav are loaded from cache self.dict_has_values(laikad.astro_dog.orbits) self.dict_has_values(laikad.astro_dog.nav) @@ -185,7 +206,7 @@ class TestLaikad(unittest.TestCase): mock_method.assert_not_called() # Verify cache is working for only orbits by running a segment - laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT) + laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True) msg = verify_messages(self.logs, laikad, return_one_success=True) self.assertIsNotNone(msg) # Verify orbit data is not downloaded diff --git a/selfdrive/test/process_replay/README.md b/selfdrive/test/process_replay/README.md index 9fa4ca644a..531ddb3a02 100644 --- a/selfdrive/test/process_replay/README.md +++ b/selfdrive/test/process_replay/README.md @@ -5,7 +5,7 @@ Process replay is a regression test designed to identify any changes in the outp If the test fails, make sure that you didn't unintentionally change anything. If there are intentional changes, the reference logs will be updated. Use `test_processes.py` to run the test locally. -Use `FILEREADER_CACHE='1' test_processes.py` to cache log files. +Use `FILEREADER_CACHE='1' test_processes.py` to cache log files. Currently the following processes are tested: @@ -15,6 +15,7 @@ Currently the following processes are tested: * calibrationd * dmonitoringd * locationd +* laikad * paramsd * ubloxd diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 228f07c4c2..def61a10a1 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -236,6 +236,13 @@ def ublox_rcv_callback(msg): return [] +def laika_rcv_callback(msg, CP, cfg, fsm): + if msg.ubloxGnss.which() == "measurementReport": + return ["gnssMeasurements"], True + else: + return [], False + + CONFIGS = [ ProcessConfig( proc_name="controlsd", @@ -338,6 +345,17 @@ CONFIGS = [ tolerance=None, fake_pubsubmaster=False, ), + ProcessConfig( + proc_name="laikad", + pub_sub={ + "ubloxGnss": ["gnssMeasurements"], + }, + ignore=["logMonoTime"], + init_callback=get_car_params, + should_recv_callback=laika_rcv_callback, + tolerance=NUMPY_TOLERANCE, + fake_pubsubmaster=True, + ), ] diff --git a/selfdrive/test/process_replay/ref_commit b/selfdrive/test/process_replay/ref_commit index 1402284b96..3e02830bf4 100644 --- a/selfdrive/test/process_replay/ref_commit +++ b/selfdrive/test/process_replay/ref_commit @@ -1 +1 @@ -a16ca1082cd493f6cea5252eaaba9f8c6574334a +2ee969b34585f8055bb3eabab2dcc4061cc4bef9 \ No newline at end of file diff --git a/selfdrive/test/profiling/profiler.py b/selfdrive/test/profiling/profiler.py index 5f73176fab..91226fc577 100755 --- a/selfdrive/test/profiling/profiler.py +++ b/selfdrive/test/profiling/profiler.py @@ -53,6 +53,7 @@ def profile(proc, func, car='toyota'): msgs = list(LogReader(rlog_url)) * int(os.getenv("LOOP", "1")) os.environ['FINGERPRINT'] = fingerprint + os.environ['REPLAY'] = "1" def run(sm, pm, can_sock): try: @@ -81,12 +82,14 @@ if __name__ == '__main__': from selfdrive.controls.radard import radard_thread from selfdrive.locationd.paramsd import main as paramsd_thread from selfdrive.controls.plannerd import main as plannerd_thread + from selfdrive.locationd.laikad import main as laikad_thread procs = { 'radard': radard_thread, 'controlsd': controlsd_thread, 'paramsd': paramsd_thread, 'plannerd': plannerd_thread, + 'laikad': laikad_thread, } proc = sys.argv[1]