openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
4.1 KiB

#!/usr/bin/env python3
import unittest
from datetime import datetime
from laika.ephemeris import EphemerisType
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox
from selfdrive.locationd.laikad import Laikad, create_measurement_msg
from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader
def get_log(segs=range(0)):
logs = []
for i in segs:
logs.extend(LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", i)))
return [m for m in logs if m.which() == 'ubloxGnss']
def verify_messages(lr, laikad):
good_msgs = []
for m in lr:
msg = laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime)
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
good_msgs.append(msg)
return good_msgs
class TestLaikad(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.logs = get_log(range(1))
def test_create_msg_without_errors(self):
gpstime = GPSTime.from_datetime(datetime.now())
meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.})
# Fake observables_final to be correct
meas.observables_final = meas.observables
msg = create_measurement_msg(meas)
self.assertEqual(msg.constellationId, 'gps')
def test_laika_online(self):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_online_nav_only(self):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV)
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_offline(self):
# Set auto_update to false forces to use ephemeris messages
laikad = Laikad(auto_update=False)
correct_msgs = verify_messages(self.logs, laikad)
self.assertEqual(256, len(correct_msgs))
self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_offline_ephem_at_start(self):
# Test offline but process ephemeris msgs of segment first
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV)
ephemeris_logs = [m for m in self.logs if m.ubloxGnss.which() == 'ephemeris']
correct_msgs = verify_messages(ephemeris_logs+self.logs, laikad)
self.assertEqual(554, len(correct_msgs))
self.assertGreaterEqual(554, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_get_orbits(self):
laikad = Laikad(auto_update=False)
first_gps_time = None
for m in self.logs:
if m.ubloxGnss.which == 'measurementReport':
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
if len(new_meas) != 0:
first_gps_time = new_meas[0].recv_time
break
# Pretend thread has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_orbits(first_gps_time)
self.assertEqual(31, len(laikad.astro_dog.orbits.keys()))
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
@unittest.skip("Use to debug live data")
def test_laika_get_orbits_now(self):
laikad = Laikad(auto_update=False)
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()))
print(laikad.latest_epoch_fetched.as_datetime())
print(min(laikad.astro_dog.orbits[list(laikad.astro_dog.orbits.keys())[0]], key=lambda e: e.epoch).epoch.as_datetime())
if __name__ == "__main__":
unittest.main()