openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

117 lines
4.1 KiB

#!/usr/bin/env python3
import unittest
import time
import numpy as np
from laika import AstroDog
from laika.helpers import ConstellationId
from laika.raw_gnss import correct_measurements, process_measurements, read_raw_ublox
from laika.opt import calc_pos_fix
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.system.hardware.hw import Paths
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.helpers import with_processes
import cereal.messaging as messaging
def get_gnss_measurements(log_reader):
gnss_measurements = []
for msg in log_reader:
if msg.which() == "ubloxGnss":
ublox_msg = msg.ubloxGnss
if ublox_msg.which == 'measurementReport':
report = ublox_msg.measurementReport
if len(report.measurements) > 0:
gnss_measurements.append(read_raw_ublox(report))
return gnss_measurements
def get_ublox_raw(log_reader):
ublox_raw = []
for msg in log_reader:
if msg.which() == "ubloxRaw":
ublox_raw.append(msg)
return ublox_raw
class TestUbloxProcessing(unittest.TestCase):
NUM_TEST_PROCESS_MEAS = 10
@classmethod
def setUpClass(cls):
lr = LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", 0))
cls.gnss_measurements = get_gnss_measurements(lr)
# test gps ephemeris continuity check (drive has ephemeris issues with cutover data)
lr = LogReader(get_url("37b6542f3211019a|2023-01-15--23-45-10", 14))
cls.ublox_raw = get_ublox_raw(lr)
def test_read_ublox_raw(self):
count_gps = 0
count_glonass = 0
for measurements in self.gnss_measurements:
for m in measurements:
if m.constellation_id == ConstellationId.GPS:
count_gps += 1
elif m.constellation_id == ConstellationId.GLONASS:
count_glonass += 1
self.assertEqual(count_gps, 5036)
self.assertEqual(count_glonass, 3651)
def test_get_fix(self):
dog = AstroDog(cache_dir=Paths.download_cache_root())
position_fix_found = 0
count_processed_measurements = 0
count_corrected_measurements = 0
position_fix_found_after_correcting = 0
pos_ests = []
for measurements in self.gnss_measurements[:self.NUM_TEST_PROCESS_MEAS]:
processed_meas = process_measurements(measurements, dog)
count_processed_measurements += len(processed_meas)
pos_fix = calc_pos_fix(processed_meas)
if len(pos_fix) > 0 and all(p != 0 for p in pos_fix[0]):
position_fix_found += 1
corrected_meas = correct_measurements(processed_meas, pos_fix[0][:3], dog)
count_corrected_measurements += len(corrected_meas)
pos_fix = calc_pos_fix(corrected_meas)
if len(pos_fix) > 0 and all(p != 0 for p in pos_fix[0]):
pos_ests.append(pos_fix[0])
position_fix_found_after_correcting += 1
mean_fix = np.mean(np.array(pos_ests)[:, :3], axis=0)
np.testing.assert_allclose(mean_fix, [-2452306.662377, -4778343.136806, 3428550.090557], rtol=0, atol=1)
# Note that can happen that there are less corrected measurements compared to processed when they are invalid.
# However, not for the current segment
self.assertEqual(position_fix_found, self.NUM_TEST_PROCESS_MEAS)
self.assertEqual(position_fix_found_after_correcting, self.NUM_TEST_PROCESS_MEAS)
self.assertEqual(count_processed_measurements, 69)
self.assertEqual(count_corrected_measurements, 69)
@with_processes(['ubloxd'])
def test_ublox_gps_cutover(self):
time.sleep(2)
ugs = messaging.sub_sock("ubloxGnss", timeout=0.1)
ur_pm = messaging.PubMaster(['ubloxRaw'])
def replay_segment():
rcv_msgs = []
for msg in self.ublox_raw:
ur_pm.send(msg.which(), msg.as_builder())
time.sleep(0.001)
rcv_msgs += messaging.drain_sock(ugs)
time.sleep(0.1)
rcv_msgs += messaging.drain_sock(ugs)
return rcv_msgs
# replay twice to enforce cutover data on rewind
rcv_msgs = replay_segment()
rcv_msgs += replay_segment()
ephems_cnt = sum(m.ubloxGnss.which() == 'ephemeris' for m in rcv_msgs)
self.assertEqual(ephems_cnt, 15)
if __name__ == "__main__":
unittest.main()