diff --git a/system/sensord/rawgps/rawgpsd.py b/system/sensord/rawgps/rawgpsd.py index 3e50090d0e..f588e6b153 100755 --- a/system/sensord/rawgps/rawgpsd.py +++ b/system/sensord/rawgps/rawgpsd.py @@ -8,6 +8,7 @@ import time import pycurl import subprocess from datetime import datetime +from multiprocessing import Process from typing import NoReturn, Optional from struct import unpack_from, calcsize, pack @@ -29,6 +30,8 @@ from system.sensord.rawgps.structs import (dict_unpacker, position_report, relis LOG_GNSS_OEMDRE_SVPOLY_REPORT) DEBUG = int(os.getenv("DEBUG", "0"))==1 +ASSIST_DATA_FILE = '/tmp/xtra3grc.bin' +ASSISTANCE_URL = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' LOG_TYPES = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, @@ -84,7 +87,7 @@ measurementStatusGlonassFields = { def try_setup_logs(diag, log_types): - for _ in range(5): + for _ in range(3): try: setup_logs(diag, log_types) break @@ -94,11 +97,12 @@ def try_setup_logs(diag, log_types): raise Exception(f"setup logs failed, {log_types=}") def at_cmd(cmd: str) -> Optional[str]: - for _ in range(5): + for _ in range(3): try: return subprocess.check_output(f"mmcli -m any --timeout 30 --command='{cmd}'", shell=True, encoding='utf8') except subprocess.CalledProcessError: cloudlog.exception("rawgps.mmcli_command_failed") + time.sleep(1.0) raise Exception(f"failed to execute mmcli command {cmd=}") @@ -109,53 +113,45 @@ def gps_enabled() -> bool: except subprocess.CalledProcessError as exc: raise Exception("failed to execute QGPS mmcli command") from exc -def download_and_inject_assistance(): - assist_data_file = '/tmp/xtra3grc.bin' - assistance_url = 'http://xtrapath3.izatcloud.net/xtra3grc.bin' - +def download_assistance(): try: - # download assistance - try: + c = pycurl.Curl() + c.setopt(pycurl.URL, ASSISTANCE_URL) + c.setopt(pycurl.NOBODY, 1) + c.setopt(pycurl.CONNECTTIMEOUT, 2) + c.perform() + bytes_n = c.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) + c.close() + if bytes_n > 1e5: + cloudlog.error("Qcom assistance data larger than expected") + return + + with open(ASSIST_DATA_FILE, 'wb') as fp: c = pycurl.Curl() - c.setopt(pycurl.URL, assistance_url) - c.setopt(pycurl.NOBODY, 1) - c.setopt(pycurl.CONNECTTIMEOUT, 2) + c.setopt(pycurl.URL, ASSISTANCE_URL) + c.setopt(pycurl.CONNECTTIMEOUT, 5) + + c.setopt(pycurl.WRITEDATA, fp) c.perform() - bytes_n = c.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) c.close() - if bytes_n > 1e5: - cloudlog.error("Qcom assistance data larger than expected") - return - - with open(assist_data_file, 'wb') as fp: - c = pycurl.Curl() - c.setopt(pycurl.URL, assistance_url) - c.setopt(pycurl.CONNECTTIMEOUT, 5) - - c.setopt(pycurl.WRITEDATA, fp) - c.perform() - c.close() - except pycurl.error: - cloudlog.exception("Failed to download assistance file") - return + except pycurl.error: + cloudlog.exception("Failed to download assistance file") + return - # inject into module - try: - cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={assist_data_file}" - subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) - cloudlog.info("successfully loaded assistance data") - except subprocess.CalledProcessError as e: - cloudlog.event( - "rawgps.assistance_loading_failed", - error=True, - cmd=e.cmd, - output=e.output, - returncode=e.returncode - ) - finally: - if os.path.exists(assist_data_file): - os.remove(assist_data_file) +def inject_assistance(): + try: + cmd = f"mmcli -m any --timeout 30 --location-inject-assistance-data={ASSIST_DATA_FILE}" + subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) + cloudlog.info("successfully loaded assistance data") + except subprocess.CalledProcessError as e: + cloudlog.event( + "rawgps.assistance_loading_failed", + error=True, + cmd=e.cmd, + output=e.output, + returncode=e.returncode + ) def setup_quectel(diag: ModemDiag): # enable OEMDRE in the NV @@ -180,7 +176,8 @@ def setup_quectel(diag: ModemDiag): # Do internet assistance at_cmd("AT+QGPSXTRA=1") at_cmd("AT+QGPSSUPLURL=\"NULL\"") - download_and_inject_assistance() + if os.path.exists(ASSIST_DATA_FILE): + inject_assistance() #at_cmd("AT+QGPSXTRADATA?") time_str = datetime.utcnow().strftime("%Y/%m/%d,%H:%M:%S") at_cmd(f"AT+QGPSXTRATIME=0,\"{time_str}\",1,1,1000") @@ -214,6 +211,15 @@ def teardown_quectel(diag): try_setup_logs(diag, []) +def wait_for_modem(): + cloudlog.warning("waiting for modem to come up") + while True: + ret = subprocess.call("mmcli -m any --timeout 10 --command=\"AT+QGPS?\"", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) + if ret == 0: + return + time.sleep(0.1) + + def main() -> NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) @@ -229,28 +235,25 @@ def main() -> NoReturn: unpack_position, _ = dict_unpacker(position_report) - # wait for ModemManager to come up - cloudlog.warning("waiting for modem to come up") - while True: - ret = subprocess.call("mmcli -m any --timeout 10 --command=\"AT+QGPS?\"", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) - if ret == 0: - break - time.sleep(0.1) + wait_for_modem() - # connect to modem - diag = ModemDiag() - - def cleanup(sig, frame): - cloudlog.warning(f"caught sig {sig}, disabling quectel gps") + assist_fetch_proc = None + def cleanup(proc): + cloudlog.warning("caught sig disabling quectel gps") gpio_set(GPIO.UBLOX_PWR_EN, False) teardown_quectel(diag) cloudlog.warning("quectel cleanup done") sys.exit(0) - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGTERM, cleanup) + signal.signal(signal.SIGINT, lambda sig, frame: cleanup(assist_fetch_proc)) + signal.signal(signal.SIGTERM, lambda sig, frame: cleanup(assist_fetch_proc)) + # connect to modem + diag = ModemDiag() + download_assistance() + want_assistance = not os.path.exists(ASSIST_DATA_FILE) setup_quectel(diag) current_gps_time = utc_to_gpst(GPSTime.from_datetime(datetime.utcnow())) + last_fetch_time = time.monotonic() cloudlog.warning("quectel setup done") gpio_init(GPIO.UBLOX_PWR_EN, True) gpio_set(GPIO.UBLOX_PWR_EN, True) @@ -258,6 +261,20 @@ def main() -> NoReturn: pm = messaging.PubMaster(['qcomGnss', 'gpsLocation']) while 1: + if os.path.exists(ASSIST_DATA_FILE): + if want_assistance: + setup_quectel(diag) + want_assistance = False + else: + os.remove(ASSIST_DATA_FILE) + if want_assistance and time.monotonic() - last_fetch_time > 10: + if assist_fetch_proc is None or not assist_fetch_proc.is_alive(): # type: ignore + cloudlog.warning("fetching assistance data") + assist_fetch_proc = Process(target=download_assistance) + assist_fetch_proc.start() + last_fetch_time = time.monotonic() + + opcode, payload = diag.recv() if opcode != DIAG_LOG_F: cloudlog.error(f"Unhandled opcode: {opcode}") @@ -345,6 +362,8 @@ def main() -> NoReturn: gps.speedAccuracy = math.sqrt(sum([x**2 for x in vNEDsigma])) # quectel gps verticalAccuracy is clipped to 500, set invalid if so gps.flags = 1 if gps.verticalAccuracy != 500 else 0 + if gps.flags: + want_assistance = False pm.send('gpsLocation', msg) diff --git a/system/sensord/rawgps/test_rawgps.py b/system/sensord/rawgps/test_rawgps.py index 918c0e9f10..2132b77009 100755 --- a/system/sensord/rawgps/test_rawgps.py +++ b/system/sensord/rawgps/test_rawgps.py @@ -9,68 +9,61 @@ import numpy as np import cereal.messaging as messaging from system.hardware import TICI -from system.sensord.rawgps.rawgpsd import at_cmd +from system.sensord.rawgps.rawgpsd import at_cmd, wait_for_modem from selfdrive.manager.process_config import managed_processes from common.transformations.coordinates import ecef_from_geodetic GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0'))) -UPDATE_MS = 100 -UPDATES_PER_S = 1000//UPDATE_MS class TestRawgpsd(unittest.TestCase): @classmethod def setUpClass(cls): + os.system("sudo systemctl restart systemd-resolved") + os.system("sudo systemctl restart ModemManager lte") + wait_for_modem() if not TICI: raise unittest.SkipTest + cls.sm = messaging.SubMaster(['qcomGnss', 'gpsLocation', 'gnssMeasurements']) + + @classmethod + def tearDownClass(cls): + managed_processes['rawgpsd'].stop() + os.system("sudo systemctl restart systemd-resolved") + os.system("sudo systemctl restart ModemManager lte") - cls.sm_qcom_gnss = messaging.SubMaster(['qcomGnss']) - cls.sm_gps_location = messaging.SubMaster(['gpsLocation']) - cls.sm_gnss_measurements = messaging.SubMaster(['gnssMeasurements']) + def setUp(self): + at_cmd("AT+QGPSDEL=0") def tearDown(self): managed_processes['rawgpsd'].stop() + os.system("sudo systemctl restart systemd-resolved") def _wait_for_output(self, t=10): - self.sm_qcom_gnss.update(0) - for __ in range(t*UPDATES_PER_S): - self.sm_qcom_gnss.update(UPDATE_MS) - if self.sm_qcom_gnss.updated['qcomGnss']: - return True - - def _wait_for_location(self, t=10): - self.sm_gps_location.update(0) - for __ in range(t*UPDATES_PER_S): - self.sm_gps_location.update(UPDATE_MS) - if self.sm_gps_location.updated['gpsLocation'] and self.sm_gps_location['gpsLocation'].flags: - return True - return False - - def _wait_for_laikad_location(self, t=10): - self.sm_gnss_measurements.update(0) - for __ in range(t*UPDATES_PER_S): - self.sm_gnss_measurements.update(UPDATE_MS) - if self.sm_gnss_measurements.updated['gnssMeasurements'] and self.sm_gnss_measurements['gnssMeasurements'].positionECEF.valid: - return True - return False + time.sleep(t) + self.sm.update() + + def test_no_crash_double_command(self): + at_cmd("AT+QGPSDEL=0") + at_cmd("AT+QGPSDEL=0") def test_wait_for_modem(self): os.system("sudo systemctl stop ModemManager lte") managed_processes['rawgpsd'].start() - assert not self._wait_for_output(10) + self._wait_for_output(10) + assert not self.sm.updated['qcomGnss'] os.system("sudo systemctl restart ModemManager lte") - assert self._wait_for_output(30) + self._wait_for_output(30) + assert self.sm.updated['qcomGnss'] def test_startup_time(self): - for _ in range(5): + for i in range(2): + if i == 1: + os.system("sudo systemctl stop systemd-resolved") managed_processes['rawgpsd'].start() - - start_time = time.monotonic() - assert self._wait_for_output(), "rawgpsd didn't start outputting messages in time" - - et = time.monotonic() - start_time - assert et < 7, f"rawgpsd took {et:.1f}s to start" + self._wait_for_output(7) + assert self.sm.updated['qcomGnss'] managed_processes['rawgpsd'].stop() def test_turns_off_gnss(self): @@ -83,38 +76,63 @@ class TestRawgpsd(unittest.TestCase): loc_status = json.loads(ls) assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'} - def test_assistance_loading(self): - # clear assistance data - at_cmd("AT+QGPSDEL=0") - - managed_processes['rawgpsd'].start() - assert self._wait_for_output(10) - managed_processes['rawgpsd'].stop() + def check_assistance(self, should_be_loaded): # after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"' # after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"' out = at_cmd("AT+QGPSXTRADATA?") out = out.split("+QGPSXTRADATA:")[1].split("'")[0].strip() valid_duration, injected_time_str = out.split(",", 1) - assert valid_duration == "10080" # should be max time - injected_time = datetime.datetime.strptime(injected_time_str.replace("\"", ""), "%Y/%m/%d,%H:%M:%S") - self.assertLess(abs((datetime.datetime.utcnow() - injected_time).total_seconds()), 60*60*12) + if should_be_loaded: + assert valid_duration == "10080" # should be max time + injected_time = datetime.datetime.strptime(injected_time_str.replace("\"", ""), "%Y/%m/%d,%H:%M:%S") + self.assertLess(abs((datetime.datetime.utcnow() - injected_time).total_seconds()), 60*60*12) + else: + valid_duration, injected_time_str = out.split(",", 1) + injected_time_str = injected_time_str.replace('\"', '').replace('\'', '') + assert injected_time_str[:] == '1980/01/05,19:00:00'[:] + assert valid_duration == '0' + + def test_assistance_loading(self): + managed_processes['rawgpsd'].start() + self._wait_for_output(10) + assert self.sm.updated['qcomGnss'] + managed_processes['rawgpsd'].stop() + self.check_assistance(True) + + def test_no_assistance_loading(self): + os.system("sudo systemctl stop systemd-resolved") + + managed_processes['rawgpsd'].start() + self._wait_for_output(10) + assert self.sm.updated['qcomGnss'] + managed_processes['rawgpsd'].stop() + self.check_assistance(False) + + def test_late_assistance_loading(self): + os.system("sudo systemctl stop systemd-resolved") + + managed_processes['rawgpsd'].start() + self._wait_for_output(17) + assert self.sm.updated['qcomGnss'] + os.system("sudo systemctl restart systemd-resolved") + self._wait_for_output(15) + managed_processes['rawgpsd'].stop() + self.check_assistance(True) @unittest.skipIf(not GOOD_SIGNAL, "No good GPS signal") def test_fix(self): - # clear assistance data - at_cmd("AT+QGPSDEL=0") - managed_processes['rawgpsd'].start() managed_processes['laikad'].start() - assert self._wait_for_location(120) - assert self.sm_gps_location['gpsLocation'].flags == 1 - module_fix = ecef_from_geodetic([self.sm_gps_location['gpsLocation'].latitude, - self.sm_gps_location['gpsLocation'].longitude, - self.sm_gps_location['gpsLocation'].altitude]) - assert self._wait_for_laikad_location(90) - total_diff = np.array(self.sm_gnss_measurements['gnssMeasurements'].positionECEF.value) - module_fix - print(total_diff) + assert self._wait_for_output(60) + assert self.sm.updated['qcomGnss'] + assert self.sm.updated['gpsLocation'] + assert self.sm['gpsLocation'].flags == 1 + module_fix = ecef_from_geodetic([self.sm['gpsLocation'].latitude, + self.sm['gpsLocation'].longitude, + self.sm['gpsLocation'].altitude]) + assert self.sm['gnssMeasurements'].positionECEF.valid + total_diff = np.array(self.sm['gnssMeasurements'].positionECEF.value) - module_fix self.assertLess(np.linalg.norm(total_diff), 100) managed_processes['laikad'].stop() managed_processes['rawgpsd'].stop()