#!/usr/bin/env python3 import os import sys import signal import itertools import math import time import requests import shutil import subprocess import datetime from multiprocessing import Process, Event from typing import NoReturn, Optional from struct import unpack_from, calcsize, pack from cereal import log import cereal.messaging as messaging from openpilot.common.gpio import gpio_init, gpio_set from openpilot.common.retry import retry from openpilot.system.hardware.tici.pins import GPIO from openpilot.common.swaglog import cloudlog from openpilot.system.qcomgpsd.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs, send_recv from openpilot.system.qcomgpsd.structs import (dict_unpacker, position_report, relist, gps_measurement_report, gps_measurement_report_sv, glonass_measurement_report, glonass_measurement_report_sv, oemdre_measurement_report, oemdre_measurement_report_sv, oemdre_svpoly_report, LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 ASSIST_DATA_FILE = '/tmp/xtra3grc.bin' ASSIST_DATA_FILE_DOWNLOAD = ASSIST_DATA_FILE + '.download' ASSISTANCE_URL = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, LOG_GNSS_OEMDRE_MEASUREMENT_REPORT, LOG_GNSS_POSITION_REPORT, LOG_GNSS_OEMDRE_SVPOLY_REPORT, ] miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } @retry(attempts=10, delay=1.0) def try_setup_logs(diag, logs): return setup_logs(diag, logs) @retry(attempts=3, delay=1.0) def at_cmd(cmd: str) -> Optional[str]: return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8') def gps_enabled() -> bool: return "QGPS: 1" in at_cmd("AT+QGPS?") def download_assistance(): try: response = requests.get(ASSISTANCE_URL, timeout=5, stream=True) with open(ASSIST_DATA_FILE_DOWNLOAD, 'wb') as fp: for chunk in response.iter_content(chunk_size=8192): fp.write(chunk) if fp.tell() > 1e5: cloudlog.error("Qcom assistance data larger than expected") return os.rename(ASSIST_DATA_FILE_DOWNLOAD, ASSIST_DATA_FILE) except requests.exceptions.RequestException: cloudlog.exception("Failed to download assistance file") return def downloader_loop(event): if os.path.exists(ASSIST_DATA_FILE): os.remove(ASSIST_DATA_FILE) alt_path = os.getenv("QCOM_ALT_ASSISTANCE_PATH", None) if alt_path is not None and os.path.exists(alt_path): shutil.copyfile(alt_path, ASSIST_DATA_FILE) try: while not os.path.exists(ASSIST_DATA_FILE) and not event.is_set(): download_assistance() event.wait(timeout=10) except KeyboardInterrupt: pass @retry(attempts=5, delay=0.2, ignore_failure=True) def inject_assistance(): cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={ASSIST_DATA_FILE}" subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) cloudlog.info("successfully loaded assistance data") @retry(attempts=5, delay=1.0) def setup_quectel(diag: ModemDiag) -> bool: ret = False # enable OEMDRE in the NV # TODO: it has to reboot for this to take effect DIAG_NV_READ_F = 38 DIAG_NV_WRITE_F = 39 NV_GNSS_OEM_FEATURE_MASK = 7165 send_recv(diag, DIAG_NV_WRITE_F, pack(' NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) unpack_oemdre_meas, size_oemdre_meas = dict_unpacker(oemdre_measurement_report, True) unpack_oemdre_meas_sv, size_oemdre_meas_sv = dict_unpacker(oemdre_measurement_report_sv, True) unpack_svpoly, _ = dict_unpacker(oemdre_svpoly_report, True) unpack_position, _ = dict_unpacker(position_report) unpack_position, _ = dict_unpacker(position_report) wait_for_modem() stop_download_event = Event() assist_fetch_proc = Process(target=downloader_loop, args=(stop_download_event,)) assist_fetch_proc.start() def cleanup(sig, frame): cloudlog.warning("caught sig disabling quectel gps") gpio_set(GPIO.GNSS_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") stop_download_event.set() assist_fetch_proc.kill() assist_fetch_proc.join() sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) # connect to modem diag = ModemDiag() r = setup_quectel(diag) want_assistance = not r cloudlog.warning("quectel setup done") gpio_init(GPIO.GNSS_PWR_EN, True) gpio_set(GPIO.GNSS_PWR_EN, True) pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: if os.path.exists(ASSIST_DATA_FILE) and want_assistance: setup_quectel(diag) want_assistance = False opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") continue (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from(' 6*SECS_IN_DAY: epoch.week += 1 elif epoch.tow > 6*SECS_IN_DAY and current_gps_time.tow < SECS_IN_DAY: epoch.week -= 1 poly.gpsWeek = epoch.week poly.gpsTow = epoch.tow ''' pm.send('qcomGnss', msg) elif log_type in [LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT]: msg = messaging.new_message('qcomGnss', valid=True) gnss = msg.qcomGnss gnss.logTs = log_time gnss.init('measurementReport') report = gnss.measurementReport if log_type == LOG_GNSS_GPS_MEASUREMENT_REPORT: dat = unpack_gps_meas(log_payload) sats = log_payload[size_gps_meas:] unpack_meas_sv, size_meas_sv = unpack_gps_meas_sv, size_gps_meas_sv report.source = 0 # gps measurement_status_fields = (measurementStatusFields.items(), measurementStatusGPSFields.items()) elif log_type == LOG_GNSS_GLONASS_MEASUREMENT_REPORT: dat = unpack_glonass_meas(log_payload) sats = log_payload[size_glonass_meas:] unpack_meas_sv, size_meas_sv = unpack_glonass_meas_sv, size_glonass_meas_sv report.source = 1 # glonass measurement_status_fields = (measurementStatusFields.items(), measurementStatusGlonassFields.items()) else: raise RuntimeError(f"invalid log_type: {log_type}") for k,v in dat.items(): if k == "version": assert v == 0 elif k == "week": report.gpsWeek = v elif k == "svCount": pass else: setattr(report, k, v) report.init('sv', dat['svCount']) if dat['svCount'] > 0: assert len(sats)//dat['svCount'] == size_meas_sv for i in range(dat['svCount']): sv = report.sv[i] sv.init('measurementStatus') sat = unpack_meas_sv(sats[size_meas_sv*i:size_meas_sv*(i+1)]) for k,v in sat.items(): if k == "parityErrorCount": sv.gpsParityErrorCount = v elif k == "frequencyIndex": sv.glonassFrequencyIndex = v elif k == "hemmingErrorCount": sv.glonassHemmingErrorCount = v elif k == "measurementStatus": for kk,vv in itertools.chain(*measurement_status_fields): setattr(sv.measurementStatus, kk, bool(v & (1<