diff --git a/Jenkinsfile b/Jenkinsfile index a90bad9ec7..3c3e72e422 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -139,6 +139,7 @@ pipeline { ["test camerad", "python system/camerad/test/test_camerad.py"], ["test exposure", "python system/camerad/test/test_exposure.py"], ["test amp", "python system/hardware/tici/tests/test_amplifier.py"], + ["test rawgpsd", "python system/sensord/rawgps/test_rawgps.py"], ]) } } diff --git a/system/sensord/rawgps/rawgpsd.py b/system/sensord/rawgps/rawgpsd.py index 46f5cf83c9..8e3e160be2 100755 --- a/system/sensord/rawgps/rawgpsd.py +++ b/system/sensord/rawgps/rawgpsd.py @@ -8,7 +8,7 @@ import time import pycurl import subprocess from datetime import datetime -from typing import NoReturn +from typing import NoReturn, Optional from struct import unpack_from, calcsize, pack from cereal import log @@ -91,15 +91,13 @@ def try_setup_logs(diag, log_types): else: raise Exception(f"setup logs failed, {log_types=}") -def at_cmd(cmd: str) -> None: +def at_cmd(cmd: str) -> Optional[str]: for _ in range(5): try: - subprocess.check_call(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True) - break + return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8') except subprocess.CalledProcessError: cloudlog.exception("rawgps.mmcli_command_failed") - else: - raise Exception(f"failed to execute mmcli command {cmd=}") + raise Exception(f"failed to execute mmcli command {cmd=}") def gps_enabled() -> bool: @@ -183,7 +181,7 @@ def setup_quectel(diag: ModemDiag): #at_cmd("AT+QGPSXTRADATA?") time_str = datetime.utcnow().strftime("%Y/%m/%d,%H:%M:%S") at_cmd(f"AT+QGPSXTRATIME=0,\"{time_str}\",1,1,1000") - + at_cmd("AT+QGPSCFG=\"outport\",\"usbnmea\"") at_cmd("AT+QGPS=1") diff --git a/system/sensord/rawgps/test_rawgps.py b/system/sensord/rawgps/test_rawgps.py index 5bd0833955..773d7350dd 100755 --- a/system/sensord/rawgps/test_rawgps.py +++ b/system/sensord/rawgps/test_rawgps.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 +import os import json import time +import datetime import unittest import subprocess import cereal.messaging as messaging from system.hardware import TICI +from system.sensord.rawgps.rawgpsd import at_cmd from selfdrive.manager.process_config import managed_processes @@ -15,20 +18,33 @@ class TestRawgpsd(unittest.TestCase): if not TICI: raise unittest.SkipTest + cls.sm = messaging.SubMaster(['qcomGnss']) + def tearDown(self): managed_processes['rawgpsd'].stop() + def _wait_for_output(self, t=10): + self.sm.update(0) + for __ in range(t*10): + self.sm.update(100) + if self.sm.updated['qcomGnss']: + break + return self.sm.updated['qcomGnss'] + + def test_wait_for_modem(self): + os.system("sudo systemctl stop ModemManager lte") + managed_processes['rawgpsd'].start() + assert not self._wait_for_output(10) + + os.system("sudo systemctl restart ModemManager lte") + assert self._wait_for_output(30) + def test_startup_time(self): for _ in range(5): - sm = messaging.SubMaster(['qcomGnss']) managed_processes['rawgpsd'].start() start_time = time.monotonic() - for __ in range(10): - sm.update(1 * 1000) - if sm.updated['qcomGnss']: - break - assert sm.rcv_frame['qcomGnss'] > 0, "rawgpsd didn't start outputting messages in time" + assert self._wait_for_output(), "rawgpsd didn't start outputting messages in time" et = time.monotonic() - start_time assert et < 5, f"rawgpsd took {et:.1f}s to start" @@ -44,6 +60,24 @@ class TestRawgpsd(unittest.TestCase): loc_status = json.loads(ls) assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'} + def test_assistance_loading(self): + # clear assistance data + at_cmd("AT+QGPSDEL=0") + + managed_processes['rawgpsd'].start() + assert self._wait_for_output(10) + managed_processes['rawgpsd'].stop() + + # after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"' + # after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"' + out = at_cmd("AT+QGPSXTRADATA?") + out = out.split("+QGPSXTRADATA:")[1].split("'")[0].strip() + valid_duration, injected_date = out.split(",", 1) + assert valid_duration == "10080" # should be max time + + # TODO: time doesn't match up + assert injected_date[1:].startswith(datetime.datetime.now().strftime("%Y/%m/%d")) + if __name__ == "__main__": - unittest.main() + unittest.main(failfast=True)