Laikad: Use process for parsing orbits (#24769)

* Use Process instead of Thread to fetch orbits

* small refactor

* Cleanup
pull/24771/head
Gijs Koning 3 years ago committed by GitHub
parent b215d611b1
commit e7234e22b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 66
      selfdrive/locationd/laikad.py
  2. 10
      selfdrive/locationd/test/test_laikad.py

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import threading
import time import time
from typing import List from multiprocessing import Process, Queue
from typing import List, Optional
import numpy as np import numpy as np
from collections import defaultdict from collections import defaultdict
@ -27,14 +27,17 @@ class Laikad:
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)): def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)):
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types) self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
self.gnss_kf = GNSSKalman(GENERATED_DIR) self.gnss_kf = GNSSKalman(GENERATED_DIR)
self.latest_time_msg = None self.orbit_p: Optional[Process] = None
self.orbit_q = Queue()
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int): def process_ublox_msg(self, ublox_msg, ublox_mono_time: int):
if ublox_msg.which == 'measurementReport': if ublox_msg.which == 'measurementReport':
report = ublox_msg.measurementReport report = ublox_msg.measurementReport
new_meas = read_raw_ublox(report)
if report.gpsWeek > 0: if report.gpsWeek > 0:
self.latest_time_msg = GPSTime(report.gpsWeek, report.rcvTow) latest_msg_t = GPSTime(report.gpsWeek, report.rcvTow)
self.fetch_orbits(latest_msg_t + SECS_IN_MIN, block=False)
new_meas = read_raw_ublox(report)
measurements = process_measurements(new_meas, self.astro_dog) measurements = process_measurements(new_meas, self.astro_dog)
pos_fix = calc_pos_fix(measurements, min_measurements=4) pos_fix = calc_pos_fix(measurements, min_measurements=4)
# To get a position fix a minimum of 5 measurements are needed. # To get a position fix a minimum of 5 measurements are needed.
@ -104,18 +107,28 @@ class Laikad:
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag) self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
def orbit_thread(self, end_event: threading.Event): def get_orbit_data(self, t: GPSTime, queue):
while not end_event.is_set(): cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
if self.latest_time_msg: start_time = time.monotonic()
self.fetch_orbits(self.latest_time_msg + SECS_IN_MIN) self.astro_dog.get_orbit_data(t, only_predictions=True)
time.sleep(0.1) cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times))
def fetch_orbits(self, t: GPSTime): def fetch_orbits(self, t: GPSTime, block):
if t not in self.astro_dog.orbit_fetched_times: if t not in self.astro_dog.orbit_fetched_times:
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") if self.orbit_p is None:
start_time = time.monotonic() self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q))
self.astro_dog.get_orbit_data(t, only_predictions=True) self.orbit_p.start()
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s") ret = None
if block:
ret = self.orbit_q.get(block=True)
elif not self.orbit_q.empty():
ret = self.orbit_q.get()
if ret:
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
self.orbit_p.join()
self.orbit_p = None
def create_measurement_msg(meas: GNSSMeasurement): def create_measurement_msg(meas: GNSSMeasurement):
@ -162,21 +175,14 @@ def main():
pm = messaging.PubMaster(['gnssMeasurements']) pm = messaging.PubMaster(['gnssMeasurements'])
laikad = Laikad() laikad = Laikad()
while True:
end_event = threading.Event() sm.update()
threading.Thread(target=laikad.orbit_thread, args=(end_event,)).start()
try: if sm.updated['ubloxGnss']:
while not end_event.is_set(): ublox_msg = sm['ubloxGnss']
sm.update() msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
if msg is not None:
if sm.updated['ubloxGnss']: pm.send('gnssMeasurements', msg)
ublox_msg = sm['ubloxGnss']
msg = laikad.process_ublox_msg(ublox_msg, sm.logMonoTime['ubloxGnss'])
if msg is not None:
pm.send('gnssMeasurements', msg)
except (KeyboardInterrupt, SystemExit):
end_event.set()
raise
if __name__ == "__main__": if __name__ == "__main__":

@ -83,18 +83,14 @@ class TestLaikad(unittest.TestCase):
if len(new_meas) != 0: if len(new_meas) != 0:
first_gps_time = new_meas[0].recv_time first_gps_time = new_meas[0].recv_time
break break
# Pretend thread has loaded the orbits on startup by using the time of the first gps message. # Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time) laikad.fetch_orbits(first_gps_time, block=True)
self.assertEqual(29, len(laikad.astro_dog.orbits.keys())) self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
@unittest.skip("Use to debug live data") @unittest.skip("Use to debug live data")
def test_laika_get_orbits_now(self): def test_laika_get_orbits_now(self):
laikad = Laikad(auto_update=False) laikad = Laikad(auto_update=False)
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow())) laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
prn = "G01" prn = "G01"
self.assertLess(0, len(laikad.astro_dog.orbits[prn])) self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
prn = "R01" prn = "R01"

Loading…
Cancel
Save