Test rawgpsd: Make sure we get fix (#28875)

* Add fix test

* Add laika to test

* Add fix test

---------

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 771e410262
beeps
Harald Schäfer 2 years ago committed by GitHub
parent b205c1b1f0
commit 6a37f17f65
  1. 53
      system/sensord/rawgps/test_rawgps.py

@ -12,24 +12,46 @@ from system.sensord.rawgps.rawgpsd import at_cmd
from selfdrive.manager.process_config import managed_processes from selfdrive.manager.process_config import managed_processes
GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0')))
UPDATE_MS = 100
UPDATES_PER_S = 1000//UPDATE_MS
class TestRawgpsd(unittest.TestCase): class TestRawgpsd(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
if not TICI: if not TICI:
raise unittest.SkipTest raise unittest.SkipTest
cls.sm = messaging.SubMaster(['qcomGnss']) cls.sm_qcom_gnss = messaging.SubMaster(['qcomGnss'])
cls.sm_gps_location = messaging.SubMaster(['gpsLocation'])
cls.sm_gnss_measurements = messaging.SubMaster(['gnssMeasurements'])
def tearDown(self): def tearDown(self):
managed_processes['rawgpsd'].stop() managed_processes['rawgpsd'].stop()
def _wait_for_output(self, t=10): def _wait_for_output(self, t=10):
self.sm.update(0) self.sm_qcom_gnss.update(0)
for __ in range(t*10): for __ in range(t*UPDATES_PER_S):
self.sm.update(100) self.sm_qcom_gnss.update(UPDATE_MS)
if self.sm.updated['qcomGnss']: if self.sm_qcom_gnss.updated['qcomGnss']:
break return True
return self.sm.updated['qcomGnss']
def _wait_for_location(self, t=10):
self.sm_gps_location.update(0)
for __ in range(t*UPDATES_PER_S):
self.sm_gps_location.update(UPDATE_MS)
if self.sm_gps_location.updated['gpsLocation'] and self.sm_gps_location['gpsLocation'].flags:
return True
return False
def _wait_for_laikad_location(self, t=10):
self.sm_gnss_measurements.update(0)
for __ in range(t*UPDATES_PER_S):
self.sm_gnss_measurements.update(UPDATE_MS)
if self.sm_gnss_measurements.updated['gnssMeasurements'] and self.sm_gnss_measurements['gnssMeasurements'].positionECEF.valid:
return True
return False
def test_wait_for_modem(self): def test_wait_for_modem(self):
os.system("sudo systemctl stop ModemManager lte") os.system("sudo systemctl stop ModemManager lte")
@ -76,8 +98,23 @@ class TestRawgpsd(unittest.TestCase):
assert valid_duration == "10080" # should be max time assert valid_duration == "10080" # should be max time
# TODO: time doesn't match up # TODO: time doesn't match up
assert injected_date[1:].startswith(datetime.datetime.now().strftime("%Y/%m/%d")) assert injected_date[1:].startswith(datetime.datetime.utcnow().strftime("%Y/%m/%d"))
@unittest.skipIf(not GOOD_SIGNAL, "No good GPS signal")
def test_fix(self):
# clear assistance data
at_cmd("AT+QGPSDEL=0")
managed_processes['rawgpsd'].start()
#managed_processes['laikad'].start()
assert self._wait_for_location(120)
assert self.sm_gps_location['gpsLocation'].flags == 1
#module_fix = [self.sm['gpsLocation'].latitude, self.sm['gpsLocation'].longitude, self.sm['gpsLocation'].altitude]
#assert self._wait_for_laikad_location(90)
#assert self.sm['gnssMeasurements'].flags == 1
#print(self.sm_gnss_measurements['gnssMeasurements'])
#managed_processes['laikad'].stop()
managed_processes['rawgpsd'].stop()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(failfast=True) unittest.main(failfast=True)

Loading…
Cancel
Save