Test gnssUblox message processing (#24404)

* Add simple test for processing ublox messages

* Add simple test for processing ublox messages

* Update selfdrive/locationd/test/test_ublox_processing.py

Co-authored-by: Willem Melching <willem.melching@gmail.com>

* Update

* Push laika ref

Co-authored-by: Willem Melching <willem.melching@gmail.com>
old-commit-hash: e6f9f12d1c
taco
Gijs Koning 3 years ago committed by GitHub
parent a5d66024ec
commit 0010b565f5
  1. 80
      selfdrive/locationd/test/test_ublox_processing.py

@ -0,0 +1,80 @@
import unittest
import numpy as np
from laika import AstroDog
from laika.helpers import UbloxGnssId
from laika.raw_gnss import calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox
from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader
def get_gnss_measurements(log_reader):
gnss_measurements = []
for msg in log_reader:
if msg.which() == "ubloxGnss":
ublox_msg = msg.ubloxGnss
if ublox_msg.which == 'measurementReport':
report = ublox_msg.measurementReport
if len(report.measurements) > 0:
gnss_measurements.append(read_raw_ublox(report))
return gnss_measurements
class TestUbloxProcessing(unittest.TestCase):
NUM_TEST_PROCESS_MEAS = 10
@classmethod
def setUpClass(cls):
lr = LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", 0))
cls.gnss_measurements = get_gnss_measurements(lr)
def test_read_ublox_raw(self):
count_gps = 0
count_glonass = 0
for measurements in self.gnss_measurements:
for m in measurements:
if m.ublox_gnss_id == UbloxGnssId.GPS:
count_gps += 1
elif m.ublox_gnss_id == UbloxGnssId.GLONASS:
count_glonass += 1
self.assertEqual(count_gps, 5036)
self.assertEqual(count_glonass, 3651)
def test_get_fix(self):
dog = AstroDog()
position_fix_found = 0
count_processed_measurements = 0
count_corrected_measurements = 0
position_fix_found_after_correcting = 0
pos_ests = []
for measurements in self.gnss_measurements[:self.NUM_TEST_PROCESS_MEAS]:
processed_meas = process_measurements(measurements, dog)
count_processed_measurements += len(processed_meas)
pos_fix = calc_pos_fix(processed_meas)
if len(pos_fix) > 0 and all(pos_fix[0] != 0):
position_fix_found += 1
corrected_meas = correct_measurements(processed_meas, pos_fix[0][:3], dog)
count_corrected_measurements += len(corrected_meas)
pos_fix = calc_pos_fix(corrected_meas)
if len(pos_fix) > 0 and all(pos_fix[0] != 0):
pos_ests.append(pos_fix[0])
position_fix_found_after_correcting += 1
mean_fix = np.mean(np.array(pos_ests)[:, :3], axis=0)
np.testing.assert_allclose(mean_fix, [-2452306.662377, -4778343.136806, 3428550.090557], rtol=0, atol=1)
# Note that can happen that there are less corrected measurements compared to processed when they are invalid.
# However, not for the current segment
self.assertEqual(position_fix_found, self.NUM_TEST_PROCESS_MEAS)
self.assertEqual(position_fix_found_after_correcting, self.NUM_TEST_PROCESS_MEAS)
self.assertEqual(count_processed_measurements, 69)
self.assertEqual(count_corrected_measurements, 69)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save