rawgpsd: more robust + simple test (#25977)

* rawgps cleanup

* wait for modem manager

* cleanup

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: e59008bf93
taco
Adeeb Shihadeh 3 years ago committed by GitHub
parent e9ee61769c
commit 946a603f32
  1. 147
      selfdrive/sensord/rawgps/rawgpsd.py
  2. 49
      selfdrive/sensord/rawgps/test_rawgps.py

@ -5,23 +5,32 @@ import signal
import itertools
import math
import time
import subprocess
from typing import NoReturn
from struct import unpack_from, calcsize, pack
import cereal.messaging as messaging
from cereal import log
from system.swaglog import cloudlog
import cereal.messaging as messaging
from laika.gps_time import GPSTime
from system.swaglog import cloudlog
from selfdrive.sensord.rawgps.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv
from selfdrive.sensord.rawgps.structs import dict_unpacker
from selfdrive.sensord.rawgps.structs import gps_measurement_report, gps_measurement_report_sv
from selfdrive.sensord.rawgps.structs import glonass_measurement_report, glonass_measurement_report_sv
from selfdrive.sensord.rawgps.structs import oemdre_measurement_report, oemdre_measurement_report_sv
from selfdrive.sensord.rawgps.structs import LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT
from selfdrive.sensord.rawgps.structs import position_report, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT
from selfdrive.sensord.rawgps.structs import (dict_unpacker, position_report,
gps_measurement_report, gps_measurement_report_sv,
glonass_measurement_report, glonass_measurement_report_sv,
oemdre_measurement_report, oemdre_measurement_report_sv,
LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT,
LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT)
DEBUG = int(os.getenv("DEBUG", "0"))==1
LOG_TYPES = [
LOG_GNSS_GPS_MEASUREMENT_REPORT,
LOG_GNSS_GLONASS_MEASUREMENT_REPORT,
LOG_GNSS_OEMDRE_MEASUREMENT_REPORT,
LOG_GNSS_POSITION_REPORT,
]
miscStatusFields = {
"multipathEstimateIsValid": 0,
"directionIsValid": 1,
@ -65,59 +74,43 @@ measurementStatusGlonassFields = {
"glonassTimeMarkValid": 17
}
def main() -> NoReturn:
unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True)
unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True)
unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True)
unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True)
unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True)
unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True)
log_types = [
LOG_GNSS_GPS_MEASUREMENT_REPORT,
LOG_GNSS_GLONASS_MEASUREMENT_REPORT,
LOG_GNSS_OEMDRE_MEASUREMENT_REPORT,
]
pub_types = ['qcomGnss']
unpack_position, _ = dict_unpacker(position_report)
log_types.append(LOG_GNSS_POSITION_REPORT)
pub_types.append("gpsLocation")
def try_setup_logs(diag, log_types):
for _ in range(5):
try:
setup_logs(diag, log_types)
break
except Exception:
cloudlog.exception("setup logs failed, trying again")
else:
raise Exception(f"setup logs failed, {log_types=}")
# connect to modem
diag = ModemDiag()
def mmcli(cmd: str) -> None:
for _ in range(5):
try:
subprocess.check_call(f"mmcli -m 0 {cmd}", shell=True)
break
except subprocess.CalledProcessError:
cloudlog.exception("rawgps.mmcli_command_failed")
else:
raise Exception(f"failed to execute mmcli command {cmd=}")
# NV enable OEMDRE
def setup_quectel(diag: ModemDiag):
# enable OEMDRE in the NV
# TODO: it has to reboot for this to take effect
DIAG_NV_READ_F = 38
DIAG_NV_WRITE_F = 39
NV_GNSS_OEM_FEATURE_MASK = 7165
send_recv(diag, DIAG_NV_WRITE_F, pack('<HI', NV_GNSS_OEM_FEATURE_MASK, 1))
send_recv(diag, DIAG_NV_READ_F, pack('<H', NV_GNSS_OEM_FEATURE_MASK))
opcode, payload = send_recv(diag, DIAG_NV_WRITE_F, pack('<HI', NV_GNSS_OEM_FEATURE_MASK, 1))
opcode, payload = send_recv(diag, DIAG_NV_READ_F, pack('<H', NV_GNSS_OEM_FEATURE_MASK))
def try_setup_logs(diag, log_types):
for _ in range(5):
try:
setup_logs(diag, log_types)
break
except Exception:
pass
def disable_logs(sig, frame):
os.system("mmcli -m 0 --location-disable-gps-raw --location-disable-gps-nmea")
cloudlog.warning("rawgpsd: shutting down")
try_setup_logs(diag, [])
cloudlog.warning("rawgpsd: logs disabled")
sys.exit(0)
signal.signal(signal.SIGINT, disable_logs)
try_setup_logs(diag, log_types)
cloudlog.warning("rawgpsd: setup logs done")
setup_logs(diag, LOG_TYPES)
# disable DPO power savings for more accuracy
os.system("mmcli -m 0 --command='AT+QGPSCFG=\"dpoenable\",0'")
os.system("mmcli -m 0 --location-enable-gps-raw --location-enable-gps-nmea")
mmcli("--command='AT+QGPSCFG=\"dpoenable\",0'")
# don't automatically turn on GNSS on powerup
mmcli("--command='AT+QGPSCFG=\"autogps\",0'")
mmcli("--location-enable-gps-raw --location-enable-gps-nmea")
# enable OEMDRE mode
DIAG_SUBSYS_CMD_F = 75
@ -128,7 +121,7 @@ def main() -> NoReturn:
GPSDIAG_OEM_DRE_ON = 1
# gpsdiag_OemControlReqType
opcode, payload = send_recv(diag, DIAG_SUBSYS_CMD_F, pack('<BHBBIIII',
send_recv(diag, DIAG_SUBSYS_CMD_F, pack('<BHBBIIII',
DIAG_SUBSYS_GPS, # Subsystem Id
CGPS_DIAG_PDAPI_CMD, # Subsystem Command Code
CGPS_OEM_CONTROL, # CGPS Command Code
@ -138,21 +131,65 @@ def main() -> NoReturn:
0,0
))
pm = messaging.PubMaster(pub_types)
def teardown_quectel(diag):
mmcli("--location-disable-gps-raw --location-disable-gps-nmea")
try_setup_logs(diag, [])
def main() -> NoReturn:
unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True)
unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True)
unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True)
unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True)
unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True)
unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True)
unpack_position, _ = dict_unpacker(position_report)
# wait for ModemManager to come up
cloudlog.warning("waiting for modem to come up")
while True:
ret = subprocess.call("mmcli -m 0 --location-status", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)
if ret == 0:
break
time.sleep(0.1)
# connect to modem
diag = ModemDiag()
def cleanup(sig, frame):
cloudlog.warning(f"caught sig {sig}, disabling quectel gps")
teardown_quectel(diag)
cloudlog.warning("quectel cleanup done")
sys.exit(0)
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
setup_quectel(diag)
cloudlog.warning("quectel setup done")
pm = messaging.PubMaster(['qcomGnss', 'gpsLocation'])
while 1:
opcode, payload = diag.recv()
assert opcode == DIAG_LOG_F
(pending_msgs, log_outer_length), inner_log_packet = unpack_from('<BH', payload), payload[calcsize('<BH'):]
if pending_msgs > 0:
cloudlog.debug("have %d pending messages" % pending_msgs)
assert log_outer_length == len(inner_log_packet)
(log_inner_length, log_type, log_time), log_payload = unpack_from('<HHQ', inner_log_packet), inner_log_packet[calcsize('<HHQ'):]
assert log_inner_length == len(inner_log_packet)
if log_type not in log_types:
if log_type not in LOG_TYPES:
continue
if DEBUG:
print("%.4f: got log: %x len %d" % (time.time(), log_type, len(log_payload)))
if log_type == LOG_GNSS_OEMDRE_MEASUREMENT_REPORT:
msg = messaging.new_message('qcomGnss')

@ -0,0 +1,49 @@
#!/usr/bin/env python3
import json
import time
import unittest
import subprocess
import cereal.messaging as messaging
from system.hardware import TICI
from selfdrive.manager.process_config import managed_processes
class TestRawgpsd(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
def tearDown(self):
managed_processes['rawgpsd'].stop()
def test_startup_time(self):
for _ in range(5):
sm = messaging.SubMaster(['qcomGnss'])
managed_processes['rawgpsd'].start()
start_time = time.monotonic()
for __ in range(10):
sm.update(1 * 1000)
if sm.updated['qcomGnss']:
break
assert sm.rcv_frame['qcomGnss'] > 0, "rawgpsd didn't start outputting messages in time"
et = time.monotonic() - start_time
assert et < 5, f"rawgpsd took {et:.1f}s to start"
managed_processes['rawgpsd'].stop()
def test_turns_off_gnss(self):
for s in (0.1, 0.5, 1, 5):
managed_processes['rawgpsd'].start()
time.sleep(s)
managed_processes['rawgpsd'].stop()
ls = subprocess.check_output("mmcli -m 0 --location-status --output-json", shell=True, encoding='utf-8')
loc_status = json.loads(ls)
assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'}
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save