diff --git a/system/sensord/rawgps/test_rawgps.py b/system/sensord/rawgps/test_rawgps.py index 0e0d5e0365..96bd422960 100755 --- a/system/sensord/rawgps/test_rawgps.py +++ b/system/sensord/rawgps/test_rawgps.py @@ -12,24 +12,46 @@ from system.sensord.rawgps.rawgpsd import at_cmd from selfdrive.manager.process_config import managed_processes +GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0'))) +UPDATE_MS = 100 +UPDATES_PER_S = 1000//UPDATE_MS + + class TestRawgpsd(unittest.TestCase): @classmethod def setUpClass(cls): if not TICI: raise unittest.SkipTest - cls.sm = messaging.SubMaster(['qcomGnss']) + cls.sm_qcom_gnss = messaging.SubMaster(['qcomGnss']) + cls.sm_gps_location = messaging.SubMaster(['gpsLocation']) + cls.sm_gnss_measurements = messaging.SubMaster(['gnssMeasurements']) def tearDown(self): managed_processes['rawgpsd'].stop() def _wait_for_output(self, t=10): - self.sm.update(0) - for __ in range(t*10): - self.sm.update(100) - if self.sm.updated['qcomGnss']: - break - return self.sm.updated['qcomGnss'] + self.sm_qcom_gnss.update(0) + for __ in range(t*UPDATES_PER_S): + self.sm_qcom_gnss.update(UPDATE_MS) + if self.sm_qcom_gnss.updated['qcomGnss']: + return True + + def _wait_for_location(self, t=10): + self.sm_gps_location.update(0) + for __ in range(t*UPDATES_PER_S): + self.sm_gps_location.update(UPDATE_MS) + if self.sm_gps_location.updated['gpsLocation'] and self.sm_gps_location['gpsLocation'].flags: + return True + return False + + def _wait_for_laikad_location(self, t=10): + self.sm_gnss_measurements.update(0) + for __ in range(t*UPDATES_PER_S): + self.sm_gnss_measurements.update(UPDATE_MS) + if self.sm_gnss_measurements.updated['gnssMeasurements'] and self.sm_gnss_measurements['gnssMeasurements'].positionECEF.valid: + return True + return False def test_wait_for_modem(self): os.system("sudo systemctl stop ModemManager lte") @@ -76,8 +98,23 @@ class TestRawgpsd(unittest.TestCase): assert valid_duration == "10080" # should be max time # TODO: time doesn't match up - assert injected_date[1:].startswith(datetime.datetime.now().strftime("%Y/%m/%d")) + assert injected_date[1:].startswith(datetime.datetime.utcnow().strftime("%Y/%m/%d")) + @unittest.skipIf(not GOOD_SIGNAL, "No good GPS signal") + def test_fix(self): + # clear assistance data + at_cmd("AT+QGPSDEL=0") + + managed_processes['rawgpsd'].start() + #managed_processes['laikad'].start() + assert self._wait_for_location(120) + assert self.sm_gps_location['gpsLocation'].flags == 1 + #module_fix = [self.sm['gpsLocation'].latitude, self.sm['gpsLocation'].longitude, self.sm['gpsLocation'].altitude] + #assert self._wait_for_laikad_location(90) + #assert self.sm['gnssMeasurements'].flags == 1 + #print(self.sm_gnss_measurements['gnssMeasurements']) + #managed_processes['laikad'].stop() + managed_processes['rawgpsd'].stop() if __name__ == "__main__": unittest.main(failfast=True)