rawgpsd tests (#28683)

* rawgpsd tests

* test xtra

* check freq

* fix

* restart lte

---------

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 5992bf4acc
beeps
Adeeb Shihadeh 2 years ago committed by GitHub
parent 4be4904143
commit 8d80b8445c
  1. 1
      Jenkinsfile
  2. 12
      system/sensord/rawgps/rawgpsd.py
  3. 48
      system/sensord/rawgps/test_rawgps.py

1
Jenkinsfile vendored

@ -139,6 +139,7 @@ pipeline {
["test camerad", "python system/camerad/test/test_camerad.py"], ["test camerad", "python system/camerad/test/test_camerad.py"],
["test exposure", "python system/camerad/test/test_exposure.py"], ["test exposure", "python system/camerad/test/test_exposure.py"],
["test amp", "python system/hardware/tici/tests/test_amplifier.py"], ["test amp", "python system/hardware/tici/tests/test_amplifier.py"],
["test rawgpsd", "python system/sensord/rawgps/test_rawgps.py"],
]) ])
} }
} }

@ -8,7 +8,7 @@ import time
import pycurl import pycurl
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import NoReturn from typing import NoReturn, Optional
from struct import unpack_from, calcsize, pack from struct import unpack_from, calcsize, pack
from cereal import log from cereal import log
@ -91,15 +91,13 @@ def try_setup_logs(diag, log_types):
else: else:
raise Exception(f"setup logs failed, {log_types=}") raise Exception(f"setup logs failed, {log_types=}")
def at_cmd(cmd: str) -> None: def at_cmd(cmd: str) -> Optional[str]:
for _ in range(5): for _ in range(5):
try: try:
subprocess.check_call(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True) return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8')
break
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
cloudlog.exception("rawgps.mmcli_command_failed") cloudlog.exception("rawgps.mmcli_command_failed")
else: raise Exception(f"failed to execute mmcli command {cmd=}")
raise Exception(f"failed to execute mmcli command {cmd=}")
def gps_enabled() -> bool: def gps_enabled() -> bool:
@ -183,7 +181,7 @@ def setup_quectel(diag: ModemDiag):
#at_cmd("AT+QGPSXTRADATA?") #at_cmd("AT+QGPSXTRADATA?")
time_str = datetime.utcnow().strftime("%Y/%m/%d,%H:%M:%S") time_str = datetime.utcnow().strftime("%Y/%m/%d,%H:%M:%S")
at_cmd(f"AT+QGPSXTRATIME=0,\"{time_str}\",1,1,1000") at_cmd(f"AT+QGPSXTRATIME=0,\"{time_str}\",1,1,1000")
at_cmd("AT+QGPSCFG=\"outport\",\"usbnmea\"") at_cmd("AT+QGPSCFG=\"outport\",\"usbnmea\"")
at_cmd("AT+QGPS=1") at_cmd("AT+QGPS=1")

@ -1,11 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import json import json
import time import time
import datetime
import unittest import unittest
import subprocess import subprocess
import cereal.messaging as messaging import cereal.messaging as messaging
from system.hardware import TICI from system.hardware import TICI
from system.sensord.rawgps.rawgpsd import at_cmd
from selfdrive.manager.process_config import managed_processes from selfdrive.manager.process_config import managed_processes
@ -15,20 +18,33 @@ class TestRawgpsd(unittest.TestCase):
if not TICI: if not TICI:
raise unittest.SkipTest raise unittest.SkipTest
cls.sm = messaging.SubMaster(['qcomGnss'])
def tearDown(self): def tearDown(self):
managed_processes['rawgpsd'].stop() managed_processes['rawgpsd'].stop()
def _wait_for_output(self, t=10):
self.sm.update(0)
for __ in range(t*10):
self.sm.update(100)
if self.sm.updated['qcomGnss']:
break
return self.sm.updated['qcomGnss']
def test_wait_for_modem(self):
os.system("sudo systemctl stop ModemManager lte")
managed_processes['rawgpsd'].start()
assert not self._wait_for_output(10)
os.system("sudo systemctl restart ModemManager lte")
assert self._wait_for_output(30)
def test_startup_time(self): def test_startup_time(self):
for _ in range(5): for _ in range(5):
sm = messaging.SubMaster(['qcomGnss'])
managed_processes['rawgpsd'].start() managed_processes['rawgpsd'].start()
start_time = time.monotonic() start_time = time.monotonic()
for __ in range(10): assert self._wait_for_output(), "rawgpsd didn't start outputting messages in time"
sm.update(1 * 1000)
if sm.updated['qcomGnss']:
break
assert sm.rcv_frame['qcomGnss'] > 0, "rawgpsd didn't start outputting messages in time"
et = time.monotonic() - start_time et = time.monotonic() - start_time
assert et < 5, f"rawgpsd took {et:.1f}s to start" assert et < 5, f"rawgpsd took {et:.1f}s to start"
@ -44,6 +60,24 @@ class TestRawgpsd(unittest.TestCase):
loc_status = json.loads(ls) loc_status = json.loads(ls)
assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'} assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'}
def test_assistance_loading(self):
# clear assistance data
at_cmd("AT+QGPSDEL=0")
managed_processes['rawgpsd'].start()
assert self._wait_for_output(10)
managed_processes['rawgpsd'].stop()
# after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"'
# after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"'
out = at_cmd("AT+QGPSXTRADATA?")
out = out.split("+QGPSXTRADATA:")[1].split("'")[0].strip()
valid_duration, injected_date = out.split(",", 1)
assert valid_duration == "10080" # should be max time
# TODO: time doesn't match up
assert injected_date[1:].startswith(datetime.datetime.now().strftime("%Y/%m/%d"))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main(failfast=True)

Loading…
Cancel
Save