#!/usr/bin/env python3 import os import sys import signal import itertools import math import time import subprocess from typing import NoReturn from struct import unpack_from, calcsize, pack from cereal import log import cereal.messaging as messaging from common.gpio import gpio_init, gpio_set from laika.gps_time import GPSTime from system.hardware.tici.pins import GPIO from system.swaglog import cloudlog from system.sensord.rawgps.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv from system.sensord.rawgps.structs import (dict_unpacker, position_report, relist, gps_measurement_report, gps_measurement_report_sv, glonass_measurement_report, glonass_measurement_report_sv, oemdre_measurement_report, oemdre_measurement_report_sv, oemdre_svpoly_report, LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT, ] miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } def try_setup_logs(diag, log_types): for _ in range(5): try: setup_logs(diag, log_types) break except Exception: cloudlog.exception("setup logs failed, trying again") else: raise Exception(f"setup logs failed, {log_types=}") def at_cmd(cmd: str) -> None: for _ in range(5): try: subprocess.check_call(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True) break except subprocess.CalledProcessError: cloudlog.exception("rawgps.mmcli_command_failed") else: raise Exception(f"failed to execute mmcli command {cmd=}") def gps_enabled() -> bool: try: p = subprocess.check_output("mmcli -m any --command=\"AT+QGPS?\"", shell=True) return b"QGPS: 1" in p except subprocess.CalledProcessError as exc: raise Exception("failed to execute QGPS mmcli command") from exc def setup_quectel(diag: ModemDiag): # enable OEMDRE in the NV # TODO: it has to reboot for this to take effect DIAG_NV_READ_F = 38 DIAG_NV_WRITE_F = 39 NV_GNSS_OEM_FEATURE_MASK = 7165 send_recv(diag, DIAG_NV_WRITE_F, pack(' NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True) unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True) unpack_svpoly, _ = dict_unpacker(oemdre_svpoly_report, True) unpack_position, _ = dict_unpacker(position_report) unpack_position, _ = dict_unpacker(position_report) # wait for ModemManager to come up cloudlog.warning("waiting for modem to come up") while True: ret = subprocess.call("mmcli -m any --timeout 10 --command=\"AT+QGPS?\"", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) if ret == 0: break time.sleep(0.1) # connect to modem diag = ModemDiag() def cleanup(sig, frame): cloudlog.warning(f"caught sig {sig}, disabling quectel gps") gpio_set(GPIO.UBLOX_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) setup_quectel(diag) cloudlog.warning("quectel setup done") gpio_init(GPIO.UBLOX_PWR_EN, True) gpio_set(GPIO.UBLOX_PWR_EN, True) pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") continue (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from(' 0: assert len(sats)//dat['svCount'] == size_meas_sv for i in range(dat['svCount']): sv = report.sv[i] sv.init('measurementStatus') sat = unpack_meas_sv(sats[size_meas_sv*i:size_meas_sv*(i+1)]) for k,v in sat.items(): if k == "parityErrorCount": sv.gpsParityErrorCount = v elif k == "frequencyIndex": sv.glonassFrequencyIndex = v elif k == "hemmingErrorCount": sv.glonassHemmingErrorCount = v elif k == "measurementStatus": for kk,vv in itertools.chain(*measurement_status_fields): setattr(sv.measurementStatus, kk, bool(v & (1<