add laikad to process replay (#24889)

* merge

* Fix closing process executor after fetching orbits

* cleanup

* Add ref commit and revert test_processes hack

* Fix

* Fix ref

* Fix test

* Temp

* Temp

* Trying

* Trying

* Cleanup and change test

* add ref commit

* remove print

* fix test getting stuck

* cleanup fetch_orbits

Co-authored-by: Gijs Koning <gijs-koning@live.nl>
old-commit-hash: 3823f55476
taco
Willem Melching 3 years ago committed by GitHub
parent 24d91781a6
commit 0071d28b7b
  1. 48
      selfdrive/locationd/laikad.py
  2. 27
      selfdrive/locationd/test/test_laikad.py
  3. 1
      selfdrive/test/process_replay/README.md
  4. 18
      selfdrive/test/process_replay/process_replay.py
  5. 2
      selfdrive/test/process_replay/ref_commit
  6. 3
      selfdrive/test/profiling/profiler.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import json
import os
import time
from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor
@ -32,8 +33,10 @@ class Laikad:
save_ephemeris=False, last_known_position=None):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True)
self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.orbit_fetch_executor = ProcessPoolExecutor()
self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None
self.orbit_fetch_future: Optional[Future] = None
self.last_fetch_orbits_t = None
self.last_cached_t = None
self.save_ephemeris = save_ephemeris
@ -44,9 +47,13 @@ class Laikad:
self.last_pos_fix_t = None
def load_cache(self):
if not self.save_ephemeris:
return
cache = Params().get(EPHEMERIS_CACHE)
if not cache:
return
try:
cache = json.loads(cache, object_hook=deserialize_hook)
self.astro_dog.add_orbits(cache['orbits'])
@ -152,17 +159,22 @@ class Laikad:
def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
if self.orbit_fetch_future is None:
ret = None
if block:
ret = get_orbit_data(t, *astro_dog_vars)
elif self.orbit_fetch_future is None:
self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
if block:
self.orbit_fetch_future.result()
if self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result()
elif self.orbit_fetch_future.done():
self.last_fetch_orbits_t = t
if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)
self.orbit_fetch_future = None
ret = self.orbit_fetch_future.result()
self.orbit_fetch_executor = self.orbit_fetch_future = None
if ret is not None:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.cache_ephemeris(t=t)
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
@ -174,7 +186,7 @@ def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
astro_dog.get_orbit_data(t, only_predictions=True)
data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
except RuntimeError as e:
cloudlog.info(f"No orbit data found. {e}")
cloudlog.warning(f"No orbit data found. {e}")
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
return data
@ -255,17 +267,21 @@ class EphemerisSourceType(IntEnum):
glonassIacUltraRapid = 2
def main():
sm = messaging.SubMaster(['ubloxGnss'])
pm = messaging.PubMaster(['gnssMeasurements'])
def main(sm=None, pm=None):
if sm is None:
sm = messaging.SubMaster(['ubloxGnss'])
if pm is None:
pm = messaging.PubMaster(['gnssMeasurements'])
replay = "REPLAY" in os.environ
# todo get last_known_position
laikad = Laikad(save_ephemeris=True)
laikad = Laikad(save_ephemeris=not replay)
while True:
sm.update()
if sm.updated['ubloxGnss']:
ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'], block=replay)
if msg is not None:
pm.send('gnssMeasurements', msg)

@ -7,6 +7,7 @@ from unittest import mock
from unittest.mock import Mock, patch
from common.params import Params
from laika.constants import SECS_IN_DAY
from laika.ephemeris import EphemerisType, GPSEphemeris
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId, TimeRangeHolder
@ -62,6 +63,26 @@ class TestLaikad(unittest.TestCase):
def setUp(self):
Params().delete(EPHEMERIS_CACHE)
def test_fetch_orbits_non_blocking(self):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_orbits(gpstime, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime, block=False)
ephem = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
laikad.fetch_orbits(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(5)
# Get results and save orbits to laikad:
laikad.fetch_orbits(gpstime + 2 * SECS_IN_DAY, block=False)
ephem2 = laikad.astro_dog.orbits['G01'][0]
self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2)
def test_ephemeris_source_in_msg(self):
data_mock = defaultdict(str)
data_mock['sv_id'] = 1
@ -155,7 +176,7 @@ class TestLaikad(unittest.TestCase):
while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1)
max_time -= 0.1
if max_time == 0:
if max_time < 0:
self.fail("Cache has not been written after 2 seconds")
# Test cache with no ephemeris
@ -170,7 +191,7 @@ class TestLaikad(unittest.TestCase):
wait_for_cache()
# Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV)
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, save_ephemeris=True)
# Verify orbits and nav are loaded from cache
self.dict_has_values(laikad.astro_dog.orbits)
self.dict_has_values(laikad.astro_dog.nav)
@ -185,7 +206,7 @@ class TestLaikad(unittest.TestCase):
mock_method.assert_not_called()
# Verify cache is working for only orbits by running a segment
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
# Verify orbit data is not downloaded

@ -15,6 +15,7 @@ Currently the following processes are tested:
* calibrationd
* dmonitoringd
* locationd
* laikad
* paramsd
* ubloxd

@ -236,6 +236,13 @@ def ublox_rcv_callback(msg):
return []
def laika_rcv_callback(msg, CP, cfg, fsm):
if msg.ubloxGnss.which() == "measurementReport":
return ["gnssMeasurements"], True
else:
return [], False
CONFIGS = [
ProcessConfig(
proc_name="controlsd",
@ -338,6 +345,17 @@ CONFIGS = [
tolerance=None,
fake_pubsubmaster=False,
),
ProcessConfig(
proc_name="laikad",
pub_sub={
"ubloxGnss": ["gnssMeasurements"],
},
ignore=["logMonoTime"],
init_callback=get_car_params,
should_recv_callback=laika_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
]

@ -1 +1 @@
a16ca1082cd493f6cea5252eaaba9f8c6574334a
2ee969b34585f8055bb3eabab2dcc4061cc4bef9

@ -53,6 +53,7 @@ def profile(proc, func, car='toyota'):
msgs = list(LogReader(rlog_url)) * int(os.getenv("LOOP", "1"))
os.environ['FINGERPRINT'] = fingerprint
os.environ['REPLAY'] = "1"
def run(sm, pm, can_sock):
try:
@ -81,12 +82,14 @@ if __name__ == '__main__':
from selfdrive.controls.radard import radard_thread
from selfdrive.locationd.paramsd import main as paramsd_thread
from selfdrive.controls.plannerd import main as plannerd_thread
from selfdrive.locationd.laikad import main as laikad_thread
procs = {
'radard': radard_thread,
'controlsd': controlsd_thread,
'paramsd': paramsd_thread,
'plannerd': plannerd_thread,
'laikad': laikad_thread,
}
proc = sys.argv[1]

Loading…
Cancel
Save