import os
import pytest
import json
import time
import datetime
import subprocess
import cereal . messaging as messaging
from openpilot . system . qcomgpsd . qcomgpsd import at_cmd , wait_for_modem
from openpilot . system . manager . process_config import managed_processes
GOOD_SIGNAL = bool ( int ( os . getenv ( " GOOD_SIGNAL " , ' 0 ' ) ) )
@pytest . mark . tici
class TestRawgpsd :
@classmethod
def setup_class ( cls ) :
os . system ( " sudo systemctl start systemd-resolved " )
os . system ( " sudo systemctl restart ModemManager lte " )
wait_for_modem ( )
@classmethod
def teardown_class ( cls ) :
managed_processes [ ' qcomgpsd ' ] . stop ( )
os . system ( " sudo systemctl restart systemd-resolved " )
os . system ( " sudo systemctl restart ModemManager lte " )
def setup_method ( self ) :
at_cmd ( " AT+QGPSDEL=0 " )
self . sm = messaging . SubMaster ( [ ' qcomGnss ' , ' gpsLocation ' , ' gnssMeasurements ' ] )
def teardown_method ( self ) :
managed_processes [ ' qcomgpsd ' ] . stop ( )
os . system ( " sudo systemctl restart systemd-resolved " )
def _wait_for_output ( self , t ) :
dt = 0.1
for _ in range ( t * int ( 1 / dt ) ) :
self . sm . update ( 0 )
if self . sm . updated [ ' qcomGnss ' ] :
break
time . sleep ( dt )
return self . sm . updated [ ' qcomGnss ' ]
def test_no_crash_double_command ( self ) :
at_cmd ( " AT+QGPSDEL=0 " )
at_cmd ( " AT+QGPSDEL=0 " )
def test_wait_for_modem ( self ) :
os . system ( " sudo systemctl stop ModemManager " )
managed_processes [ ' qcomgpsd ' ] . start ( )
assert not self . _wait_for_output ( 5 )
os . system ( " sudo systemctl restart ModemManager " )
assert self . _wait_for_output ( 30 )
def test_startup_time ( self , subtests ) :
for internet in ( True , False ) :
if not internet :
os . system ( " sudo systemctl stop systemd-resolved " )
with subtests . test ( internet = internet ) :
managed_processes [ ' qcomgpsd ' ] . start ( )
assert self . _wait_for_output ( 7 )
managed_processes [ ' qcomgpsd ' ] . stop ( )
def test_turns_off_gnss ( self , subtests ) :
for s in ( 0.1 , 1 , 5 ) :
with subtests . test ( runtime = s ) :
managed_processes [ ' qcomgpsd ' ] . start ( )
time . sleep ( s )
managed_processes [ ' qcomgpsd ' ] . stop ( )
ls = subprocess . check_output ( " mmcli -m any --location-status --output-json " , shell = True , encoding = ' utf-8 ' )
loc_status = json . loads ( ls )
assert set ( loc_status [ ' modem ' ] [ ' location ' ] [ ' enabled ' ] ) < = { ' 3gpp-lac-ci ' }
def check_assistance ( self , should_be_loaded ) :
# after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"'
# after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"'
out = at_cmd ( " AT+QGPSXTRADATA? " )
out = out . split ( " +QGPSXTRADATA: " ) [ 1 ] . split ( " ' " ) [ 0 ] . strip ( )
valid_duration , injected_time_str = out . split ( " , " , 1 )
if should_be_loaded :
assert valid_duration == " 10080 " # should be max time
injected_time = datetime . datetime . strptime ( injected_time_str . replace ( " \" " , " " ) , " % Y/ % m/ %d , % H: % M: % S " )
assert abs ( ( datetime . datetime . now ( datetime . UTC ) . replace ( tzinfo = None ) - injected_time ) . total_seconds ( ) ) < 60 * 60 * 12
else :
valid_duration , injected_time_str = out . split ( " , " , 1 )
injected_time_str = injected_time_str . replace ( ' \" ' , ' ' ) . replace ( ' \' ' , ' ' )
assert injected_time_str [ : ] == ' 1980/01/05,19:00:00 ' [ : ]
assert valid_duration == ' 0 '
def test_assistance_loading ( self ) :
managed_processes [ ' qcomgpsd ' ] . start ( )
assert self . _wait_for_output ( 10 )
managed_processes [ ' qcomgpsd ' ] . stop ( )
self . check_assistance ( True )
def test_no_assistance_loading ( self ) :
os . system ( " sudo systemctl stop systemd-resolved " )
managed_processes [ ' qcomgpsd ' ] . start ( )
assert self . _wait_for_output ( 10 )
managed_processes [ ' qcomgpsd ' ] . stop ( )
self . check_assistance ( False )
def test_late_assistance_loading ( self ) :
os . system ( " sudo systemctl stop systemd-resolved " )
managed_processes [ ' qcomgpsd ' ] . start ( )
self . _wait_for_output ( 17 )
assert self . sm . updated [ ' qcomGnss ' ]
os . system ( " sudo systemctl restart systemd-resolved " )
time . sleep ( 15 )
managed_processes [ ' qcomgpsd ' ] . stop ( )
self . check_assistance ( True )