#!/usr/bin/env python3 import os import sys import signal import itertools import math import time import pycurl import subprocess from datetime import datetime from typing import NoReturn, Optional from struct import unpack_from, calcsize, pack from cereal import log import cereal.messaging as messaging from common.gpio import gpio_init, gpio_set from laika.gps_time import GPSTime from system.hardware.tici.pins import GPIO from system.swaglog import cloudlog from system.sensord.rawgps.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv from system.sensord.rawgps.structs import (dict_unpacker, position_report, relist, gps_measurement_report, gps_measurement_report_sv, glonass_measurement_report, glonass_measurement_report_sv, oemdre_measurement_report, oemdre_measurement_report_sv, oemdre_svpoly_report, LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT, ] miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } def try_setup_logs(diag, log_types): for _ in range(5): try: setup_logs(diag, log_types) break except Exception: cloudlog.exception("setup logs failed, trying again") else: raise Exception(f"setup logs failed, {log_types=}") def at_cmd(cmd: str) -> Optional[str]: for _ in range(5): try: return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8') except subprocess.CalledProcessError: cloudlog.exception("rawgps.mmcli_command_failed") raise Exception(f"failed to execute mmcli command {cmd=}") def gps_enabled() -> bool: try: p = subprocess.check_output("mmcli -m any --command=\"AT+QGPS?\"", shell=True) return b"QGPS: 1" in p except subprocess.CalledProcessError as exc: raise Exception("failed to execute QGPS mmcli command") from exc def download_and_inject_assistance(): assist_data_file = '/tmp/xtra3grc.bin' assistance_url = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' try: # download assistance try: c = pycurl.Curl() c.setopt(pycurl.URL, assistance_url) c.setopt(pycurl.NOBODY, 1) c.setopt(pycurl.CONNECTTIMEOUT, 2) c.perform() bytes_n = c.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) c.close() if bytes_n > 1e5: cloudlog.error("Qcom assistance data larger than expected") return with open(assist_data_file, 'wb') as fp: c = pycurl.Curl() c.setopt(pycurl.URL, assistance_url) c.setopt(pycurl.CONNECTTIMEOUT, 5) c.setopt(pycurl.WRITEDATA, fp) c.perform() c.close() except pycurl.error: cloudlog.exception("Failed to download assistance file") return # inject into module try: cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={assist_data_file}" subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) cloudlog.info("successfully loaded assistance data") except subprocess.CalledProcessError as e: cloudlog.event( "rawgps.assistance_loading_failed", error=True, cmd=e.cmd, output=e.output, returncode=e.returncode ) finally: if os.path.exists(assist_data_file): os.remove(assist_data_file) def setup_quectel(diag: ModemDiag): # enable OEMDRE in the NV # TODO: it has to reboot for this to take effect DIAG_NV_READ_F = 38 DIAG_NV_WRITE_F = 39 NV_GNSS_OEM_FEATURE_MASK = 7165 send_recv(diag, DIAG_NV_WRITE_F, pack(' NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True) unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True) unpack_svpoly, _ = dict_unpacker(oemdre_svpoly_report, True) unpack_position, _ = dict_unpacker(position_report) unpack_position, _ = dict_unpacker(position_report) # wait for ModemManager to come up cloudlog.warning("waiting for modem to come up") while True: ret = subprocess.call("mmcli -m any --timeout 10 --command=\"AT+QGPS?\"", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) if ret == 0: break time.sleep(0.1) # connect to modem diag = ModemDiag() def cleanup(sig, frame): cloudlog.warning(f"caught sig {sig}, disabling quectel gps") gpio_set(GPIO.UBLOX_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) setup_quectel(diag) cloudlog.warning("quectel setup done") gpio_init(GPIO.UBLOX_PWR_EN, True) gpio_set(GPIO.UBLOX_PWR_EN, True) pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") continue (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from(' 0: assert len(sats)//dat['svCount'] == size_meas_sv for i in range(dat['svCount']): sv = report.sv[i] sv.init('measurementStatus') sat = unpack_meas_sv(sats[size_meas_sv*i:size_meas_sv*(i+1)]) for k,v in sat.items(): if k == "parityErrorCount": sv.gpsParityErrorCount = v elif k == "frequencyIndex": sv.glonassFrequencyIndex = v elif k == "hemmingErrorCount": sv.glonassHemmingErrorCount = v elif k == "measurementStatus": for kk,vv in itertools.chain(*measurement_status_fields): setattr(sv.measurementStatus, kk, bool(v & (1<