|
|
|
@ -6,7 +6,7 @@ import cereal.messaging as messaging |
|
|
|
|
from common.params import Params |
|
|
|
|
from datetime import datetime |
|
|
|
|
from unittest import mock |
|
|
|
|
from unittest.mock import patch |
|
|
|
|
#from unittest.mock import patch |
|
|
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -14,7 +14,7 @@ from laika.constants import SECS_IN_DAY |
|
|
|
|
from laika.downloader import DownloadFailed |
|
|
|
|
from laika.ephemeris import EphemerisType, GPSEphemeris, ephemeris_structs |
|
|
|
|
from laika.gps_time import GPSTime |
|
|
|
|
from laika.helpers import ConstellationId, TimeRangeHolder |
|
|
|
|
from laika.helpers import ConstellationId |
|
|
|
|
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox, read_raw_qcom |
|
|
|
|
from selfdrive.locationd.laikad import EPHEMERIS_CACHE, EphemerisSourceType, Laikad, create_measurement_msg |
|
|
|
|
from selfdrive.test.openpilotci import get_url |
|
|
|
@ -261,11 +261,10 @@ class TestLaikad(unittest.TestCase): |
|
|
|
|
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0) |
|
|
|
|
prn = "R01" |
|
|
|
|
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0) |
|
|
|
|
print(min(laikad.astro_dog.navs[prn], key=lambda e: e.epoch).epoch.as_datetime()) |
|
|
|
|
|
|
|
|
|
def test_get_navs_in_process(self): |
|
|
|
|
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs]): |
|
|
|
|
laikad = Laikad(auto_update=False, use_qcom=use_qcom) |
|
|
|
|
laikad = Laikad(auto_update=False, use_qcom=use_qcom, auto_fetch_navs=False) |
|
|
|
|
has_navs = False |
|
|
|
|
has_fix = False |
|
|
|
|
for m in logs: |
|
|
|
@ -275,14 +274,14 @@ class TestLaikad(unittest.TestCase): |
|
|
|
|
laikad.orbit_fetch_future.result() |
|
|
|
|
vals = laikad.astro_dog.navs.values() |
|
|
|
|
has_navs = len(vals) > 0 and max([len(v) for v in vals]) > 0 |
|
|
|
|
vals = laikad.astro_dog.orbits.values() |
|
|
|
|
vals = laikad.astro_dog.qcom_polys.values() |
|
|
|
|
has_polys = len(vals) > 0 and max([len(v) for v in vals]) > 0 |
|
|
|
|
if out_msg is not None: |
|
|
|
|
has_fix = has_fix or out_msg.gnssMeasurements.positionECEF.valid |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(has_navs or has_polys) |
|
|
|
|
self.assertTrue(has_fix) |
|
|
|
|
self.assertGreater(len(laikad.astro_dog.navs_fetched_times._ranges), 0) |
|
|
|
|
self.assertEqual(len(laikad.astro_dog.navs_fetched_times._ranges), 0) |
|
|
|
|
self.assertEqual(None, laikad.orbit_fetch_future) |
|
|
|
|
|
|
|
|
|
def test_cache(self): |
|
|
|
@ -318,19 +317,22 @@ class TestLaikad(unittest.TestCase): |
|
|
|
|
msg = verify_messages(logs, laikad, return_one_success=True) |
|
|
|
|
self.assertIsNotNone(msg) |
|
|
|
|
|
|
|
|
|
with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method: |
|
|
|
|
# Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently |
|
|
|
|
laikad.astro_dog.orbit_fetched_times = TimeRangeHolder() |
|
|
|
|
laikad.fetch_navs(self.first_gps_time, block=False) |
|
|
|
|
mock_method.assert_not_called() |
|
|
|
|
|
|
|
|
|
# Verify cache is working for only orbits by running a segment |
|
|
|
|
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True) |
|
|
|
|
msg = verify_messages(self.logs, laikad, return_one_success=True) |
|
|
|
|
self.assertIsNotNone(msg) |
|
|
|
|
# Verify orbit data is not downloaded |
|
|
|
|
mock_method.assert_not_called() |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
#TODO test cache with only orbits |
|
|
|
|
#with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method: |
|
|
|
|
# # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently |
|
|
|
|
# laikad.astro_dog.orbit_fetched_times = TimeRangeHolder() |
|
|
|
|
# laikad.fetch_navs(self.first_gps_time, block=False) |
|
|
|
|
# mock_method.assert_not_called() |
|
|
|
|
|
|
|
|
|
# # Verify cache is working for only orbits by running a segment |
|
|
|
|
# laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True) |
|
|
|
|
# msg = verify_messages(self.logs, laikad, return_one_success=True) |
|
|
|
|
# self.assertIsNotNone(msg) |
|
|
|
|
# # Verify orbit data is not downloaded |
|
|
|
|
# mock_method.assert_not_called() |
|
|
|
|
#break |
|
|
|
|
|
|
|
|
|
def test_low_gnss_meas(self): |
|
|
|
|
cnt = 0 |
|
|
|
@ -348,5 +350,6 @@ class TestLaikad(unittest.TestCase): |
|
|
|
|
self.assertGreater(len(dct), 0) |
|
|
|
|
self.assertGreater(min([len(v) for v in dct.values()]), 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
unittest.main() |
|
|
|
|