openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

293 lines
11 KiB

#!/usr/bin/env python3
import time
import unittest
from collections import defaultdict
from datetime import datetime
from unittest import mock
from unittest.mock import patch
from common.params import Params
from laika.constants import SECS_IN_DAY
from laika.downloader import DownloadFailed
from laika.ephemeris import EphemerisType, GPSEphemeris
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId, TimeRangeHolder
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox
from selfdrive.locationd.laikad import EPHEMERIS_CACHE, EphemerisSourceType, Laikad, create_measurement_msg
from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader
def get_log(segs=range(0)):
logs = []
for i in segs:
logs.extend(LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", i)))
all_logs = [m for m in logs if m.which() == 'ubloxGnss']
low_gnss = []
for m in all_logs:
if m.ubloxGnss.which() != 'measurementReport':
continue
MAX_MEAS = 7
if m.ubloxGnss.measurementReport.numMeas > MAX_MEAS:
mb = m.as_builder()
mb.ubloxGnss.measurementReport.numMeas = MAX_MEAS
mb.ubloxGnss.measurementReport.measurements = list(m.ubloxGnss.measurementReport.measurements)[:MAX_MEAS]
mb.ubloxGnss.measurementReport.measurements[0].pseudorange += 1000
low_gnss.append(mb.as_reader())
else:
low_gnss.append(m)
return all_logs, low_gnss
def verify_messages(lr, laikad, return_one_success=False):
good_msgs = []
for m in lr:
msg = laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
good_msgs.append(msg)
if return_one_success:
return msg
return good_msgs
def get_first_gps_time(logs):
for m in logs:
if m.ubloxGnss.which == 'measurementReport':
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
if len(new_meas) > 0:
return new_meas[0].recv_time
def get_measurement_mock(gpstime, sat_ephemeris):
meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.})
# Fake measurement being processed
meas.observables_final = meas.observables
meas.sat_ephemeris = sat_ephemeris
return meas
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
class TestLaikad(unittest.TestCase):
@classmethod
def setUpClass(cls):
logs, low_gnss = get_log(range(1))
cls.logs = logs
cls.low_gnss = low_gnss
first_gps_time = get_first_gps_time(logs)
cls.first_gps_time = first_gps_time
def setUp(self):
Params().remove(EPHEMERIS_CACHE)
def test_fetch_navs_non_blocking(self):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_navs(gpstime, block=False)
laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad:
laikad.fetch_navs(gpstime, block=False)
ephem = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
laikad.fetch_navs(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad:
laikad.fetch_navs(gpstime + 2 * SECS_IN_DAY, block=False)
ephem2 = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2)
def test_fetch_navs_with_wrong_clocks(self):
laikad = Laikad()
def check_has_navs():
self.assertGreater(len(laikad.astro_dog.navs), 0)
ephem = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
real_current_time = GPSTime.from_datetime(datetime(2021, month=3, day=1))
wrong_future_clock_time = real_current_time + SECS_IN_DAY
laikad.fetch_navs(wrong_future_clock_time, block=True)
check_has_navs()
self.assertEqual(laikad.last_fetch_navs_t, wrong_future_clock_time)
# Test fetching orbits with earlier time
assert real_current_time < laikad.last_fetch_navs_t
laikad.astro_dog.orbits = {}
laikad.fetch_navs(real_current_time, block=True)
check_has_navs()
self.assertEqual(laikad.last_fetch_navs_t, real_current_time)
def test_ephemeris_source_in_msg(self):
data_mock = defaultdict(str)
data_mock['sv_id'] = 1
gpstime = GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC
laikad = Laikad()
laikad.fetch_navs(gpstime, block=True)
meas = get_measurement_mock(gpstime, laikad.astro_dog.navs['R01'][0])
msg = create_measurement_msg(meas)
self.assertEqual(msg.ephemerisSource.type.raw, EphemerisSourceType.nav)
# Verify gps satellite returns same source
meas = get_measurement_mock(gpstime, laikad.astro_dog.navs['R01'][0])
msg = create_measurement_msg(meas)
self.assertEqual(msg.ephemerisSource.type.raw, EphemerisSourceType.nav)
# Test nasa source by using older date
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_navs(gpstime, block=True)
meas = get_measurement_mock(gpstime, laikad.astro_dog.navs['G01'][0])
msg = create_measurement_msg(meas)
self.assertEqual(msg.ephemerisSource.type.raw, EphemerisSourceType.nav)
# Test nav source type
ephem = GPSEphemeris(data_mock, gpstime)
meas = get_measurement_mock(gpstime, ephem)
msg = create_measurement_msg(meas)
self.assertEqual(msg.ephemerisSource.type.raw, EphemerisSourceType.nav)
def test_laika_online(self):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 554
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_kf_becomes_valid(self):
laikad = Laikad(auto_update=False)
m = self.logs[0]
self.assertFalse(all(laikad.kf_valid(m.logMonoTime * 1e-9)))
kf_valid = False
for m in self.logs:
laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
kf_valid = all(laikad.kf_valid(m.logMonoTime * 1e-9))
if kf_valid:
break
self.assertTrue(kf_valid)
def test_laika_online_nav_only(self):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV)
# Disable fetch_orbits to test NAV only
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 554
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
@mock.patch('laika.downloader.download_and_cache_file')
def test_laika_offline(self, downloader_mock):
downloader_mock.side_effect = DownloadFailed("Mock download failed")
laikad = Laikad(auto_update=False)
laikad.fetch_navs(GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC, block=True)
@mock.patch('laika.downloader.download_and_cache_file')
def test_download_failed_russian_source(self, downloader_mock):
downloader_mock.side_effect = DownloadFailed
laikad = Laikad(auto_update=False)
correct_msgs = verify_messages(self.logs, laikad)
self.assertEqual(0, len(correct_msgs))
self.assertEqual(0, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_get_orbits(self):
laikad = Laikad(auto_update=False)
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_navs(self.first_gps_time, block=True)
self.dict_has_values(laikad.astro_dog.navs)
@unittest.skip("Use to debug live data")
def test_laika_get_navs_now(self):
laikad = Laikad(auto_update=False)
laikad.fetch_navs(GPSTime.from_datetime(datetime.utcnow()), block=True)
prn = "G01"
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0)
prn = "R01"
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0)
print(min(laikad.astro_dog.navs[prn], key=lambda e: e.epoch).epoch.as_datetime())
def test_get_navs_in_process(self):
laikad = Laikad(auto_update=False)
has_navs = False
for m in self.logs:
laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=False)
if laikad.orbit_fetch_future is not None:
laikad.orbit_fetch_future.result()
vals = laikad.astro_dog.navs.values()
has_navs = len(vals) > 0 and max([len(v) for v in vals]) > 0
if has_navs:
break
self.assertTrue(has_navs)
self.assertGreater(len(laikad.astro_dog.navs_fetched_times._ranges), 0)
self.assertEqual(None, laikad.orbit_fetch_future)
def test_cache(self):
laikad = Laikad(auto_update=True, save_ephemeris=True)
def wait_for_cache():
max_time = 2
while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1)
max_time -= 0.1
if max_time < 0:
self.fail("Cache has not been written after 2 seconds")
# Test cache with no ephemeris
laikad.cache_ephemeris(t=GPSTime(0, 0))
wait_for_cache()
Params().remove(EPHEMERIS_CACHE)
#laikad.astro_dog.get_navs(self.first_gps_time)
laikad.fetch_navs(self.first_gps_time, block=True)
# Wait for cache to save
wait_for_cache()
# Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, save_ephemeris=True)
# Verify navs are loaded from cache
self.dict_has_values(laikad.astro_dog.navs)
# Verify cache is working for only nav by running a segment
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
# Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
laikad.astro_dog.orbit_fetched_times = TimeRangeHolder()
laikad.fetch_navs(self.first_gps_time, block=False)
mock_method.assert_not_called()
# Verify cache is working for only orbits by running a segment
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
msg = verify_messages(self.logs, laikad, return_one_success=True)
self.assertIsNotNone(msg)
# Verify orbit data is not downloaded
mock_method.assert_not_called()
def test_low_gnss_meas(self):
cnt = 0
laikad = Laikad()
for m in self.low_gnss:
msg = laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
if msg is None:
continue
gm = msg.gnssMeasurements
if len(gm.correctedMeasurements) != 0 and gm.positionECEF.valid:
cnt += 1
self.assertEqual(cnt, 554)
def dict_has_values(self, dct):
self.assertGreater(len(dct), 0)
self.assertGreater(min([len(v) for v in dct.values()]), 0)
if __name__ == "__main__":
unittest.main()