laikad: unit test refactor (#28295)

Refactor laika tests to use replay_process
pull/28298/head
Kacper Rączy 2 years ago committed by GitHub
parent d5d1ca11d4
commit 1f0ff21eee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      selfdrive/locationd/test/test_laikad.py

@ -2,12 +2,9 @@
import time
import unittest
from cereal import log
import cereal.messaging as messaging
from common.params import Params
from datetime import datetime
from unittest import mock
#from unittest.mock import patch
from tqdm import tqdm
from laika.constants import SECS_IN_DAY
@ -19,45 +16,22 @@ from laika.raw_gnss import GNSSMeasurement, read_raw_ublox, read_raw_qcom
from selfdrive.locationd.laikad import EPHEMERIS_CACHE, Laikad
from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader
from selfdrive.manager.process_config import managed_processes
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
def get_ublox_gnss(ubloxraw):
with OpenpilotPrefix():
managed_processes['ubloxd'].start()
timeout_ms = 30
pm = messaging.PubMaster(['ubloxRaw'])
sock = messaging.sub_sock('ubloxGnss', timeout=timeout_ms)
log_msgs = []
log_t = []
for x in tqdm(ubloxraw):
pm.send(x.which(), x.as_builder())
ret = messaging.recv_one(sock)
if ret is not None:
msg = log.Event.new_message(ubloxGnss=ret.ubloxGnss.to_dict())
msg.logMonoTime = x.logMonoTime
log_msgs.append(msg)
log_t.append(1e-9 * x.logMonoTime)
assert managed_processes['ubloxd'].get_process_state_msg().running
assert len(log_msgs) > 1 or len(ubloxraw) == 0
managed_processes['ubloxd'].stop()
return log_t, log_msgs
from selfdrive.test.process_replay.process_replay import get_process_config, replay_process
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
def get_log(segs=range(0)):
def get_log_ublox(segs=range(0)):
logs = []
for i in segs:
logs.extend(LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", i)))
raw_logs = [m for m in logs if m.which() == 'ubloxRaw']
all_logs = get_ublox_gnss(raw_logs)[1]
ublox_cfg = get_process_config("ubloxd")
all_logs = replay_process(ublox_cfg, logs)
low_gnss = []
for m in all_logs:
if m.ubloxGnss.which() != 'measurementReport':
if m.which() != "ubloxGnss" or m.ubloxGnss.which() != 'measurementReport':
continue
MAX_MEAS = 7
@ -72,6 +46,7 @@ def get_log(segs=range(0)):
low_gnss.append(m)
return all_logs, low_gnss
def get_log_qcom(segs=range(0)):
logs = []
for i in segs:
@ -79,13 +54,16 @@ def get_log_qcom(segs=range(0)):
all_logs = [m for m in logs if m.which() == 'qcomGnss']
return all_logs
def verify_messages(lr, laikad, return_one_success=False):
good_msgs = []
for m in lr:
if m.which() == 'ubloxGnss':
gnss_msg = m.ubloxGnss
else:
elif m.which() == 'qcomGnss':
gnss_msg = m.qcomGnss
else:
continue
msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=True)
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
good_msgs.append(msg)
@ -101,7 +79,7 @@ def get_first_gps_time(logs):
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
if len(new_meas) > 0:
return new_meas[0].recv_time
else:
elif m.which() == "qcomGnss":
if m.qcomGnss.which == 'measurementReport':
new_meas = read_raw_qcom(m.qcomGnss.measurementReport)
if len(new_meas) > 0:
@ -116,14 +94,11 @@ def get_measurement_mock(gpstime, sat_ephemeris):
return meas
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
class TestLaikad(unittest.TestCase):
@classmethod
def setUpClass(cls):
logs, low_gnss = get_log(range(1))
logs, low_gnss = get_log_ublox(range(1))
cls.logs = logs
cls.low_gnss = low_gnss
cls.logs_qcom = get_log_qcom(range(1))
@ -190,6 +165,9 @@ class TestLaikad(unittest.TestCase):
self.assertFalse(all(laikad.kf_valid(m.logMonoTime * 1e-9)))
kf_valid = False
for m in self.logs:
if m.which() != 'ubloxGnss':
continue
laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
kf_valid = all(laikad.kf_valid(m.logMonoTime * 1e-9))
if kf_valid:
@ -247,6 +225,9 @@ class TestLaikad(unittest.TestCase):
seen_internet_eph = False
for m in logs:
if m.which() != 'ubloxGnss' and m.which() != 'qcomGnss':
continue
gnss_msg = m.qcomGnss if use_qcom else m.ubloxGnss
out_msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=False)
if laikad.orbit_fetch_future is not None:
@ -301,8 +282,6 @@ class TestLaikad(unittest.TestCase):
self.assertTrue(any([x.source=='cache' for x in msg.gnssMeasurements.ephemerisStatuses]))
self.assertIsNotNone(msg)
#TODO test cache with only orbits
#with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
# # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
@ -333,5 +312,7 @@ class TestLaikad(unittest.TestCase):
def dict_has_values(self, dct):
self.assertGreater(len(dct), 0)
self.assertGreater(min([len(v) for v in dct.values()]), 0)
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save