@ -8,6 +8,7 @@ import time
import pycurl
import subprocess
from datetime import datetime
from multiprocessing import Process
from typing import NoReturn , Optional
from struct import unpack_from , calcsize , pack
@ -29,6 +30,8 @@ from system.sensord.rawgps.structs import (dict_unpacker, position_report, relis
LOG_GNSS_OEMDRE_SVPOLY_REPORT )
DEBUG = int ( os . getenv ( " DEBUG " , " 0 " ) ) == 1
ASSIST_DATA_FILE = ' /tmp/xtra3grc.bin '
ASSISTANCE_URL = ' http://xtrapath3.izatcloud.net/xtra3grc.bin '
LOG_TYPES = [
LOG_GNSS_GPS_MEASUREMENT_REPORT ,
@ -84,7 +87,7 @@ measurementStatusGlonassFields = {
def try_setup_logs ( diag , log_types ) :
for _ in range ( 5 ) :
for _ in range ( 3 ) :
try :
setup_logs ( diag , log_types )
break
@ -94,11 +97,12 @@ def try_setup_logs(diag, log_types):
raise Exception ( f " setup logs failed, { log_types =} " )
def at_cmd ( cmd : str ) - > Optional [ str ] :
for _ in range ( 5 ) :
for _ in range ( 3 ) :
try :
return subprocess . check_output ( f " mmcli -m any --timeout 30 --command= ' { cmd } ' " , shell = True , encoding = ' utf8 ' )
except subprocess . CalledProcessError :
cloudlog . exception ( " rawgps.mmcli_command_failed " )
time . sleep ( 1.0 )
raise Exception ( f " failed to execute mmcli command { cmd =} " )
@ -109,53 +113,45 @@ def gps_enabled() -> bool:
except subprocess . CalledProcessError as exc :
raise Exception ( " failed to execute QGPS mmcli command " ) from exc
def download_and_inject_assistance ( ) :
assist_data_file = ' /tmp/xtra3grc.bin '
assistance_url = ' http://xtrapath3.izatcloud.net/xtra3grc.bin '
def download_assistance ( ) :
try :
# download assistance
try :
c = pycurl . Curl ( )
c . setopt ( pycurl . URL , ASSISTANCE_URL )
c . setopt ( pycurl . NOBODY , 1 )
c . setopt ( pycurl . CONNECTTIMEOUT , 2 )
c . perform ( )
bytes_n = c . getinfo ( pycurl . CONTENT_LENGTH_DOWNLOAD )
c . close ( )
if bytes_n > 1e5 :
cloudlog . error ( " Qcom assistance data larger than expected " )
return
with open ( ASSIST_DATA_FILE , ' wb ' ) as fp :
c = pycurl . Curl ( )
c . setopt ( pycurl . URL , assistance_url )
c . setopt ( pycurl . NOBODY , 1 )
c . setopt ( pycurl . CONNECTTIMEOUT , 2 )
c . setopt ( pycurl . URL , ASSISTANCE_URL )
c . setopt ( pycurl . CONNECTTIMEOUT , 5 )
c . setopt ( pycurl . WRITEDATA , fp )
c . perform ( )
bytes_n = c . getinfo ( pycurl . CONTENT_LENGTH_DOWNLOAD )
c . close ( )
if bytes_n > 1e5 :
cloudlog . error ( " Qcom assistance data larger than expected " )
return
with open ( assist_data_file , ' wb ' ) as fp :
c = pycurl . Curl ( )
c . setopt ( pycurl . URL , assistance_url )
c . setopt ( pycurl . CONNECTTIMEOUT , 5 )
c . setopt ( pycurl . WRITEDATA , fp )
c . perform ( )
c . close ( )
except pycurl . error :
cloudlog . exception ( " Failed to download assistance file " )
return
except pycurl . error :
cloudlog . exception ( " Failed to download assistance file " )
return
# inject into module
try :
cmd = f " mmcli -m any --timeout 30 --location-inject-assistance-data= { assist_data_file } "
subprocess . check_output ( cmd , stderr = subprocess . PIPE , shell = True )
cloudlog . info ( " successfully loaded assistance data " )
except subprocess . CalledProcessError as e :
cloudlog . event (
" rawgps.assistance_loading_failed " ,
error = True ,
cmd = e . cmd ,
output = e . output ,
returncode = e . returncode
)
finally :
if os . path . exists ( assist_data_file ) :
os . remove ( assist_data_file )
def inject_assistance ( ) :
try :
cmd = f " mmcli -m any --timeout 30 --location-inject-assistance-data= { ASSIST_DATA_FILE } "
subprocess . check_output ( cmd , stderr = subprocess . PIPE , shell = True )
cloudlog . info ( " successfully loaded assistance data " )
except subprocess . CalledProcessError as e :
cloudlog . event (
" rawgps.assistance_loading_failed " ,
error = True ,
cmd = e . cmd ,
output = e . output ,
returncode = e . returncode
)
def setup_quectel ( diag : ModemDiag ) :
# enable OEMDRE in the NV
@ -180,7 +176,8 @@ def setup_quectel(diag: ModemDiag):
# Do internet assistance
at_cmd ( " AT+QGPSXTRA=1 " )
at_cmd ( " AT+QGPSSUPLURL= \" NULL \" " )
download_and_inject_assistance ( )
if os . path . exists ( ASSIST_DATA_FILE ) :
inject_assistance ( )
#at_cmd("AT+QGPSXTRADATA?")
time_str = datetime . utcnow ( ) . strftime ( " % Y/ % m/ %d , % H: % M: % S " )
at_cmd ( f " AT+QGPSXTRATIME=0, \" { time_str } \" ,1,1,1000 " )
@ -214,6 +211,15 @@ def teardown_quectel(diag):
try_setup_logs ( diag , [ ] )
def wait_for_modem ( ) :
cloudlog . warning ( " waiting for modem to come up " )
while True :
ret = subprocess . call ( " mmcli -m any --timeout 10 --command= \" AT+QGPS? \" " , stdout = subprocess . DEVNULL , stderr = subprocess . DEVNULL , shell = True )
if ret == 0 :
return
time . sleep ( 0.1 )
def main ( ) - > NoReturn :
unpack_gps_meas , size_gps_meas = dict_unpacker ( gps_measurement_report , True )
unpack_gps_meas_sv , size_gps_meas_sv = dict_unpacker ( gps_measurement_report_sv , True )
@ -229,28 +235,25 @@ def main() -> NoReturn:
unpack_position , _ = dict_unpacker ( position_report )
# wait for ModemManager to come up
cloudlog . warning ( " waiting for modem to come up " )
while True :
ret = subprocess . call ( " mmcli -m any --timeout 10 --command= \" AT+QGPS? \" " , stdout = subprocess . DEVNULL , stderr = subprocess . DEVNULL , shell = True )
if ret == 0 :
break
time . sleep ( 0.1 )
wait_for_modem ( )
# connect to modem
diag = ModemDiag ( )
def cleanup ( sig , frame ) :
cloudlog . warning ( f " caught sig { sig } , disabling quectel gps " )
assist_fetch_proc = None
def cleanup ( proc ) :
cloudlog . warning ( " caught sig disabling quectel gps " )
gpio_set ( GPIO . UBLOX_PWR_EN , False )
teardown_quectel ( diag )
cloudlog . warning ( " quectel cleanup done " )
sys . exit ( 0 )
signal . signal ( signal . SIGINT , cleanup )
signal . signal ( signal . SIGTERM , cleanup )
signal . signal ( signal . SIGINT , lambda sig , frame : cleanup ( assist_fetch_proc ) )
signal . signal ( signal . SIGTERM , lambda sig , frame : cleanup ( assist_fetch_proc ) )
# connect to modem
diag = ModemDiag ( )
download_assistance ( )
want_assistance = not os . path . exists ( ASSIST_DATA_FILE )
setup_quectel ( diag )
current_gps_time = utc_to_gpst ( GPSTime . from_datetime ( datetime . utcnow ( ) ) )
last_fetch_time = time . monotonic ( )
cloudlog . warning ( " quectel setup done " )
gpio_init ( GPIO . UBLOX_PWR_EN , True )
gpio_set ( GPIO . UBLOX_PWR_EN , True )
@ -258,6 +261,20 @@ def main() -> NoReturn:
pm = messaging . PubMaster ( [ ' qcomGnss ' , ' gpsLocation ' ] )
while 1 :
if os . path . exists ( ASSIST_DATA_FILE ) :
if want_assistance :
setup_quectel ( diag )
want_assistance = False
else :
os . remove ( ASSIST_DATA_FILE )
if want_assistance and time . monotonic ( ) - last_fetch_time > 10 :
if assist_fetch_proc is None or not assist_fetch_proc . is_alive ( ) : # type: ignore
cloudlog . warning ( " fetching assistance data " )
assist_fetch_proc = Process ( target = download_assistance )
assist_fetch_proc . start ( )
last_fetch_time = time . monotonic ( )
opcode , payload = diag . recv ( )
if opcode != DIAG_LOG_F :
cloudlog . error ( f " Unhandled opcode: { opcode } " )
@ -345,6 +362,8 @@ def main() -> NoReturn:
gps . speedAccuracy = math . sqrt ( sum ( [ x * * 2 for x in vNEDsigma ] ) )
# quectel gps verticalAccuracy is clipped to 500, set invalid if so
gps . flags = 1 if gps . verticalAccuracy != 500 else 0
if gps . flags :
want_assistance = False
pm . send ( ' gpsLocation ' , msg )