add laikad to process replay (#24889)

* merge

* Fix closing process executor after fetching orbits

* cleanup

* Add ref commit and revert test_processes hack

* Fix

* Fix ref

* Fix test

* Temp

* Temp

* Trying

* Trying

* Cleanup and change test

* add ref commit

* remove print

* fix test getting stuck

* cleanup fetch_orbits

Co-authored-by: Gijs Koning <gijs-koning@live.nl>
pull/24983/head
Willem Melching 3 years ago committed by GitHub
parent 4cf63f4758
commit 3823f55476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      selfdrive/locationd/laikad.py
  2. 27
      selfdrive/locationd/test/test_laikad.py
  3. 3
      selfdrive/test/process_replay/README.md
  4. 18
      selfdrive/test/process_replay/process_replay.py
  5. 2
      selfdrive/test/process_replay/ref_commit
  6. 3
      selfdrive/test/profiling/profiler.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import os
import time import time
from collections import defaultdict from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor from concurrent.futures import Future, ProcessPoolExecutor
@ -32,8 +33,10 @@ class Laikad:
save_ephemeris=False, last_known_position=None): save_ephemeris=False, last_known_position=None):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True) self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True)
self.gnss_kf = GNSSKalman(GENERATED_DIR) self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.orbit_fetch_executor = ProcessPoolExecutor()
self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None
self.orbit_fetch_future: Optional[Future] = None self.orbit_fetch_future: Optional[Future] = None
self.last_fetch_orbits_t = None self.last_fetch_orbits_t = None
self.last_cached_t = None self.last_cached_t = None
self.save_ephemeris = save_ephemeris self.save_ephemeris = save_ephemeris
@ -44,9 +47,13 @@ class Laikad:
self.last_pos_fix_t = None self.last_pos_fix_t = None
def load_cache(self): def load_cache(self):
if not self.save_ephemeris:
return
cache = Params().get(EPHEMERIS_CACHE) cache = Params().get(EPHEMERIS_CACHE)
if not cache: if not cache:
return return
try: try:
cache = json.loads(cache, object_hook=deserialize_hook) cache = json.loads(cache, object_hook=deserialize_hook)
self.astro_dog.add_orbits(cache['orbits']) self.astro_dog.add_orbits(cache['orbits'])
@ -71,7 +78,7 @@ class Laikad:
self.last_pos_residual = pos_fix_residual self.last_pos_residual = pos_fix_residual
self.last_pos_fix_t = t self.last_pos_fix_t = t
return self.last_pos_fix return self.last_pos_fix
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False): def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
if ublox_msg.which == 'measurementReport': if ublox_msg.which == 'measurementReport':
t = ublox_mono_time * 1e-9 t = ublox_mono_time * 1e-9
@ -152,17 +159,22 @@ class Laikad:
def fetch_orbits(self, t: GPSTime, block): def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR): if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
if self.orbit_fetch_future is None:
ret = None
if block:
ret = get_orbit_data(t, *astro_dog_vars)
elif self.orbit_fetch_future is None:
self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
if block: elif self.orbit_fetch_future.done():
self.orbit_fetch_future.result()
if self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result()
self.last_fetch_orbits_t = t self.last_fetch_orbits_t = t
if ret: ret = self.orbit_fetch_future.result()
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret self.orbit_fetch_executor = self.orbit_fetch_future = None
self.cache_ephemeris(t=t)
self.orbit_fetch_future = None if ret is not None:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types): def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
@ -174,7 +186,7 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
astro_dog.get_orbit_data(t, only_predictions=True) astro_dog.get_orbit_data(t, only_predictions=True)
data = (astro_dog.orbits, astro_dog.orbit_fetched_times) data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
except RuntimeError as e: except RuntimeError as e:
cloudlog.info(f"No orbit data found. {e}") cloudlog.warning(f"No orbit data found. {e}")
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s") cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
return data return data
@ -255,17 +267,21 @@ class EphemerisSourceType(IntEnum):
glonassIacUltraRapid = 2 glonassIacUltraRapid = 2
def main(): def main(sm=None, pm=None):
sm = messaging.SubMaster(['ubloxGnss']) if sm is None:
pm = messaging.PubMaster(['gnssMeasurements']) sm = messaging.SubMaster(['ubloxGnss'])
if pm is None:
pm = messaging.PubMaster(['gnssMeasurements'])
replay = "REPLAY" in os.environ
# todo get last_known_position # todo get last_known_position
laikad = Laikad(save_ephemeris=True) laikad = Laikad(save_ephemeris=not replay)
while True: while True:
sm.update() sm.update()
if sm.updated['ubloxGnss']: if sm.updated['ubloxGnss']:
ublox_msg = sm['ubloxGnss'] ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss']) msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay)
if msg is not None: if msg is not None:
pm.send('gnssMeasurements', msg) pm.send('gnssMeasurements', msg)

@ -7,6 +7,7 @@ from unittest import mock
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
from common.params import Params from common.params import Params
from laika.constants import SECS_IN_DAY
from laika.ephemeris import EphemerisType, GPSEphemeris from laika.ephemeris import EphemerisType, GPSEphemeris
from laika.gps_time import GPSTime from laika.gps_time import GPSTime
from laika.helpers import ConstellationId, TimeRangeHolder from laika.helpers import ConstellationId, TimeRangeHolder
@ -62,6 +63,26 @@ class TestLaikad(unittest.TestCase):
def setUp(self): def setUp(self):
Params().delete(EPHEMERIS_CACHE) Params().delete(EPHEMERIS_CACHE)
def test_fetch_orbits_non_blocking(self):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_orbits(gpstime, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime, block=False)
ephem = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False)
ephem2 = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2)
def test_ephemeris_source_in_msg(self): def test_ephemeris_source_in_msg(self):
data_mock = defaultdict(str) data_mock = defaultdict(str)
data_mock['sv_id'] = 1 data_mock['sv_id'] = 1
@ -155,7 +176,7 @@ class TestLaikad(unittest.TestCase):
while Params().get(EPHEMERIS_CACHE) is None: while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1) time.sleep(0.1)
max_time -= 0.1 max_time -= 0.1
if max_time == 0: if max_time < 0:
self.fail("Cache has not been written after 2 seconds") self.fail("Cache has not been written after 2 seconds")
# Test cache with no ephemeris # Test cache with no ephemeris
@ -170,7 +191,7 @@ class TestLaikad(unittest.TestCase):
wait_for_cache() wait_for_cache()
# Check both nav and orbits separate # Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV) laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, save_ephemeris=True)
# Verify orbits and nav are loaded from cache # Verify orbits and nav are loaded from cache
self.dict_has_values(laikad.astro_dog.orbits) self.dict_has_values(laikad.astro_dog.orbits)
self.dict_has_values(laikad.astro_dog.nav) self.dict_has_values(laikad.astro_dog.nav)
@ -185,7 +206,7 @@ class TestLaikad(unittest.TestCase):
mock_method.assert_not_called() mock_method.assert_not_called()
# Verify cache is working for only orbits by running a segment # Verify cache is working for only orbits by running a segment
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT) laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
msg = verify_messages(self.logs, laikad, return_one_success=True) msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg) self.assertIsNotNone(msg)
# Verify orbit data is not downloaded # Verify orbit data is not downloaded

@ -5,7 +5,7 @@ Process replay is a regression test designed to identify any changes in the outp
If the test fails, make sure that you didn't unintentionally change anything. If there are intentional changes, the reference logs will be updated. If the test fails, make sure that you didn't unintentionally change anything. If there are intentional changes, the reference logs will be updated.
Use `test_processes.py` to run the test locally. Use `test_processes.py` to run the test locally.
Use `FILEREADER_CACHE='1' test_processes.py` to cache log files. Use `FILEREADER_CACHE='1' test_processes.py` to cache log files.
Currently the following processes are tested: Currently the following processes are tested:
@ -15,6 +15,7 @@ Currently the following processes are tested:
* calibrationd * calibrationd
* dmonitoringd * dmonitoringd
* locationd * locationd
* laikad
* paramsd * paramsd
* ubloxd * ubloxd

@ -236,6 +236,13 @@ def ublox_rcv_callback(msg):
return [] return []
def laika_rcv_callback(msg, CP, cfg, fsm):
if msg.ubloxGnss.which() == "measurementReport":
return ["gnssMeasurements"], True
else:
return [], False
CONFIGS = [ CONFIGS = [
ProcessConfig( ProcessConfig(
proc_name="controlsd", proc_name="controlsd",
@ -338,6 +345,17 @@ CONFIGS = [
tolerance=None, tolerance=None,
fake_pubsubmaster=False, fake_pubsubmaster=False,
), ),
ProcessConfig(
proc_name="laikad",
pub_sub={
"ubloxGnss": ["gnssMeasurements"],
},
ignore=["logMonoTime"],
init_callback=get_car_params,
should_recv_callback=laika_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
] ]

@ -1 +1 @@
a16ca1082cd493f6cea5252eaaba9f8c6574334a 2ee969b34585f8055bb3eabab2dcc4061cc4bef9

@ -53,6 +53,7 @@ def profile(proc, func, car='toyota'):
msgs = list(LogReader(rlog_url)) * int(os.getenv("LOOP", "1")) msgs = list(LogReader(rlog_url)) * int(os.getenv("LOOP", "1"))
os.environ['FINGERPRINT'] = fingerprint os.environ['FINGERPRINT'] = fingerprint
os.environ['REPLAY'] = "1"
def run(sm, pm, can_sock): def run(sm, pm, can_sock):
try: try:
@ -81,12 +82,14 @@ if __name__ == '__main__':
from selfdrive.controls.radard import radard_thread from selfdrive.controls.radard import radard_thread
from selfdrive.locationd.paramsd import main as paramsd_thread from selfdrive.locationd.paramsd import main as paramsd_thread
from selfdrive.controls.plannerd import main as plannerd_thread from selfdrive.controls.plannerd import main as plannerd_thread
from selfdrive.locationd.laikad import main as laikad_thread
procs = { procs = {
'radard': radard_thread, 'radard': radard_thread,
'controlsd': controlsd_thread, 'controlsd': controlsd_thread,
'paramsd': paramsd_thread, 'paramsd': paramsd_thread,
'plannerd': plannerd_thread, 'plannerd': plannerd_thread,
'laikad': laikad_thread,
} }
proc = sys.argv[1] proc = sys.argv[1]

Loading…
Cancel
Save