You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							80 lines
						
					
					
						
							2.8 KiB
						
					
					
				
			
		
		
	
	
							80 lines
						
					
					
						
							2.8 KiB
						
					
					
				import unittest
 | 
						|
 | 
						|
import numpy as np
 | 
						|
 | 
						|
from laika import AstroDog
 | 
						|
from laika.helpers import ConstellationId
 | 
						|
from laika.raw_gnss import calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox
 | 
						|
from selfdrive.test.openpilotci import get_url
 | 
						|
from tools.lib.logreader import LogReader
 | 
						|
 | 
						|
 | 
						|
def get_gnss_measurements(log_reader):
 | 
						|
  gnss_measurements = []
 | 
						|
  for msg in log_reader:
 | 
						|
    if msg.which() == "ubloxGnss":
 | 
						|
      ublox_msg = msg.ubloxGnss
 | 
						|
      if ublox_msg.which == 'measurementReport':
 | 
						|
        report = ublox_msg.measurementReport
 | 
						|
        if len(report.measurements) > 0:
 | 
						|
          gnss_measurements.append(read_raw_ublox(report))
 | 
						|
  return gnss_measurements
 | 
						|
 | 
						|
 | 
						|
class TestUbloxProcessing(unittest.TestCase):
 | 
						|
  NUM_TEST_PROCESS_MEAS = 10
 | 
						|
 | 
						|
  @classmethod
 | 
						|
  def setUpClass(cls):
 | 
						|
    lr = LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", 0))
 | 
						|
    cls.gnss_measurements = get_gnss_measurements(lr)
 | 
						|
 | 
						|
  def test_read_ublox_raw(self):
 | 
						|
    count_gps = 0
 | 
						|
    count_glonass = 0
 | 
						|
    for measurements in self.gnss_measurements:
 | 
						|
      for m in measurements:
 | 
						|
        if m.constellation_id == ConstellationId.GPS:
 | 
						|
          count_gps += 1
 | 
						|
        elif m.constellation_id == ConstellationId.GLONASS:
 | 
						|
          count_glonass += 1
 | 
						|
 | 
						|
    self.assertEqual(count_gps, 5036)
 | 
						|
    self.assertEqual(count_glonass, 3651)
 | 
						|
 | 
						|
  def test_get_fix(self):
 | 
						|
    dog = AstroDog()
 | 
						|
    position_fix_found = 0
 | 
						|
    count_processed_measurements = 0
 | 
						|
    count_corrected_measurements = 0
 | 
						|
    position_fix_found_after_correcting = 0
 | 
						|
 | 
						|
    pos_ests = []
 | 
						|
    for measurements in self.gnss_measurements[:self.NUM_TEST_PROCESS_MEAS]:
 | 
						|
      processed_meas = process_measurements(measurements, dog)
 | 
						|
      count_processed_measurements += len(processed_meas)
 | 
						|
      pos_fix = calc_pos_fix(processed_meas)
 | 
						|
      if len(pos_fix) > 0 and all(pos_fix[0] != 0):
 | 
						|
        position_fix_found += 1
 | 
						|
 | 
						|
        corrected_meas = correct_measurements(processed_meas, pos_fix[0][:3], dog)
 | 
						|
        count_corrected_measurements += len(corrected_meas)
 | 
						|
 | 
						|
        pos_fix = calc_pos_fix(corrected_meas)
 | 
						|
        if len(pos_fix) > 0 and all(pos_fix[0] != 0):
 | 
						|
          pos_ests.append(pos_fix[0])
 | 
						|
          position_fix_found_after_correcting += 1
 | 
						|
 | 
						|
    mean_fix = np.mean(np.array(pos_ests)[:, :3], axis=0)
 | 
						|
    np.testing.assert_allclose(mean_fix, [-2452306.662377, -4778343.136806, 3428550.090557], rtol=0, atol=1)
 | 
						|
 | 
						|
    # Note that can happen that there are less corrected measurements compared to processed when they are invalid.
 | 
						|
    # However, not for the current segment
 | 
						|
    self.assertEqual(position_fix_found, self.NUM_TEST_PROCESS_MEAS)
 | 
						|
    self.assertEqual(position_fix_found_after_correcting, self.NUM_TEST_PROCESS_MEAS)
 | 
						|
    self.assertEqual(count_processed_measurements, 69)
 | 
						|
    self.assertEqual(count_corrected_measurements, 69)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  unittest.main()
 | 
						|
 |