You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
97 lines
3.6 KiB
97 lines
3.6 KiB
#!/usr/bin/env python3
|
|
import unittest
|
|
from datetime import datetime
|
|
from unittest import mock
|
|
from unittest.mock import Mock
|
|
|
|
from laika.ephemeris import EphemerisType
|
|
from laika.gps_time import GPSTime
|
|
from laika.helpers import ConstellationId
|
|
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox
|
|
from selfdrive.locationd.laikad import Laikad, create_measurement_msg
|
|
from selfdrive.test.openpilotci import get_url
|
|
from tools.lib.logreader import LogReader
|
|
|
|
|
|
def get_log(segs=range(0)):
|
|
logs = []
|
|
for i in segs:
|
|
logs.extend(LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", i)))
|
|
return [m for m in logs if m.which() == 'ubloxGnss']
|
|
|
|
|
|
def verify_messages(lr, laikad):
|
|
good_msgs = []
|
|
for m in lr:
|
|
msg = laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=True)
|
|
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
|
|
good_msgs.append(msg)
|
|
return good_msgs
|
|
|
|
|
|
class TestLaikad(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.logs = get_log(range(1))
|
|
|
|
def test_create_msg_without_errors(self):
|
|
gpstime = GPSTime.from_datetime(datetime.now())
|
|
meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.})
|
|
# Fake observables_final to be correct
|
|
meas.observables_final = meas.observables
|
|
msg = create_measurement_msg(meas)
|
|
|
|
self.assertEqual(msg.constellationId, 'gps')
|
|
|
|
def test_laika_online(self):
|
|
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
|
|
correct_msgs = verify_messages(self.logs, laikad)
|
|
|
|
correct_msgs_expected = 560
|
|
self.assertEqual(correct_msgs_expected, len(correct_msgs))
|
|
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
|
|
|
|
def test_laika_online_nav_only(self):
|
|
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV)
|
|
# Disable fetch_orbits to test NAV only
|
|
laikad.fetch_orbits = Mock()
|
|
correct_msgs = verify_messages(self.logs, laikad)
|
|
correct_msgs_expected = 560
|
|
self.assertEqual(correct_msgs_expected, len(correct_msgs))
|
|
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
|
|
|
|
@mock.patch('laika.downloader.download_and_cache_file')
|
|
def test_laika_offline(self, downloader_mock):
|
|
downloader_mock.side_effect = IOError
|
|
laikad = Laikad(auto_update=False)
|
|
correct_msgs = verify_messages(self.logs, laikad)
|
|
self.assertEqual(256, len(correct_msgs))
|
|
self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
|
|
|
|
def test_laika_get_orbits(self):
|
|
laikad = Laikad(auto_update=False)
|
|
first_gps_time = None
|
|
for m in self.logs:
|
|
if m.ubloxGnss.which == 'measurementReport':
|
|
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
|
|
if len(new_meas) != 0:
|
|
first_gps_time = new_meas[0].recv_time
|
|
break
|
|
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
|
|
laikad.fetch_orbits(first_gps_time, block=True)
|
|
self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
|
|
|
|
@unittest.skip("Use to debug live data")
|
|
def test_laika_get_orbits_now(self):
|
|
laikad = Laikad(auto_update=False)
|
|
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
|
|
prn = "G01"
|
|
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
|
|
prn = "R01"
|
|
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
|
|
print(min(laikad.astro_dog.orbits[prn], key=lambda e: e.epoch).epoch.as_datetime())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
|