#!/usr/bin/env python3 import os import sys import signal import itertools import math import time import pycurl import subprocess from datetime import datetime from typing import NoReturn from struct import unpack_from, calcsize, pack from cereal import log import cereal.messaging as messaging from common.gpio import gpio_init, gpio_set from laika.gps_time import GPSTime from system.hardware.tici.pins import GPIO from system.swaglog import cloudlog from system.sensord.rawgps.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv from system.sensord.rawgps.structs import (dict_unpacker, position_report, relist, gps_measurement_report, gps_measurement_report_sv, glonass_measurement_report, glonass_measurement_report_sv, oemdre_measurement_report, oemdre_measurement_report_sv, oemdre_svpoly_report, LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT, ] miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } def try_setup_logs(diag, log_types): for _ in range(5): try: setup_logs(diag, log_types) break except Exception: cloudlog.exception("setup logs failed, trying again") else: raise Exception(f"setup logs failed, {log_types=}") def at_cmd(cmd: str) -> None: for _ in range(5): try: subprocess.check_call(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True) break except subprocess.CalledProcessError: cloudlog.exception("rawgps.mmcli_command_failed") else: raise Exception(f"failed to execute mmcli command {cmd=}") def gps_enabled() -> bool: try: p = subprocess.check_output("mmcli -m any --command=\"AT+QGPS?\"", shell=True) return b"QGPS: 1" in p except subprocess.CalledProcessError as exc: raise Exception("failed to execute QGPS mmcli command") from exc def download_and_inject_assistance(): assist_data_file = '/tmp/xtra3grc.bin' assistance_url = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' try: # download assistance try: c = pycurl.Curl() c.setopt(pycurl.URL, assistance_url) c.setopt(pycurl.NOBODY, 1) c.setopt(pycurl.CONNECTTIMEOUT, 2) c.perform() bytes_n = c.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) c.close() if bytes_n > 1e5: cloudlog.error("Qcom assistance data larger than expected") return with open(assist_data_file, 'wb') as fp: c = pycurl.Curl() c.setopt(pycurl.URL, assistance_url) c.setopt(pycurl.CONNECTTIMEOUT, 5) c.setopt(pycurl.WRITEDATA, fp) c.perform() c.close() except pycurl.error: cloudlog.exception("Failed to download assistance file") return # inject into module try: cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={assist_data_file}" subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) cloudlog.info("successfully loaded assistance data") except subprocess.CalledProcessError as e: cloudlog.event( "rawgps.assistance_loading_failed", error=True, cmd=e.cmd, output=e.output, returncode=e.returncode ) finally: if os.path.exists(assist_data_file): os.remove(assist_data_file) def setup_quectel(diag: ModemDiag): # enable OEMDRE in the NV # TODO: it has to reboot for this to take effect DIAG_NV_READ_F = 38 DIAG_NV_WRITE_F = 39 NV_GNSS_OEM_FEATURE_MASK = 7165 send_recv(diag, DIAG_NV_WRITE_F, pack(' NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True) unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True) unpack_svpoly, _ = dict_unpacker(oemdre_svpoly_report, True) unpack_position, _ = dict_unpacker(position_report) unpack_position, _ = dict_unpacker(position_report) # wait for ModemManager to come up cloudlog.warning("waiting for modem to come up") while True: ret = subprocess.call("mmcli -m any --timeout 10 --command=\"AT+QGPS?\"", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) if ret == 0: break time.sleep(0.1) # connect to modem diag = ModemDiag() def cleanup(sig, frame): cloudlog.warning(f"caught sig {sig}, disabling quectel gps") gpio_set(GPIO.UBLOX_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) setup_quectel(diag) cloudlog.warning("quectel setup done") gpio_init(GPIO.UBLOX_PWR_EN, True) gpio_set(GPIO.UBLOX_PWR_EN, True) pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") continue (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from(' 0: assert len(sats)//dat['svCount'] == size_meas_sv for i in range(dat['svCount']): sv = report.sv[i] sv.init('measurementStatus') sat = unpack_meas_sv(sats[size_meas_sv*i:size_meas_sv*(i+1)]) for k,v in sat.items(): if k == "parityErrorCount": sv.gpsParityErrorCount = v elif k == "frequencyIndex": sv.glonassFrequencyIndex = v elif k == "hemmingErrorCount": sv.glonassHemmingErrorCount = v elif k == "measurementStatus": for kk,vv in itertools.chain(*measurement_status_fields): setattr(sv.measurementStatus, kk, bool(v & (1<