diff --git a/tools/gpstest/fuzzy_testing.py b/tools/gpstest/fuzzy_testing.py index 216e7d0dde..df6691c558 100755 --- a/tools/gpstest/fuzzy_testing.py +++ b/tools/gpstest/fuzzy_testing.py @@ -66,7 +66,7 @@ def get_continuous_coords(lat, lon) -> Tuple[int, int]: return round(lat, 5), round(lon, 5) rc_p: Any = None -def exec_remote_checker(lat, lon, duration): +def exec_remote_checker(lat, lon, duration, ip_addr): global rc_p # TODO: good enough for testing remote_cmd = "export PYTHONPATH=/data/pythonpath:/data/pythonpath/pyextra && " @@ -74,8 +74,8 @@ def exec_remote_checker(lat, lon, duration): remote_cmd += f"timeout {duration} /usr/local/pyenv/shims/python tools/gpstest/remote_checker.py " remote_cmd += f"{lat} {lon}" - ssh_cmd = ['ssh', '-i', '/home/batman/openpilot/xx/phone/key/id_rsa', - 'comma@192.168.60.130'] + ssh_cmd = ["ssh", "-i", "/home/batman/openpilot/xx/phone/key/id_rsa", + f"comma@{ip_addr}"] ssh_cmd += [remote_cmd] rc_p = sp.Popen(ssh_cmd, stdout=sp.PIPE) @@ -84,8 +84,9 @@ def exec_remote_checker(lat, lon, duration): print(f"Checker Result: {rc_output.strip().decode('utf-8')}") -def run_remote_checker(spoof_proc, lat, lon, duration) -> bool: - checker_thread = threading.Thread(target=exec_remote_checker, args=(lat, lon, duration)) +def run_remote_checker(spoof_proc, lat, lon, duration, ip_addr) -> bool: + checker_thread = threading.Thread(target=exec_remote_checker, + args=(lat, lon, duration, ip_addr)) checker_thread.start() tcnt = 0 @@ -107,8 +108,12 @@ def run_remote_checker(spoof_proc, lat, lon, duration) -> bool: def main(): + if len(sys.argv) < 2: + print(f"usage: {sys.argv[0]} [-c]") + ip_addr = sys.argv[1] + continuous_mode = False - if len(sys.argv) == 2 and sys.argv[1] == '-c': + if len(sys.argv) == 3 and sys.argv[2] == '-c': print("Continuous Mode!") continuous_mode = True @@ -123,7 +128,7 @@ def main(): start_time = time.monotonic() # remote checker runs blocking - if not run_remote_checker(spoof_proc, lat, lon, duration): + if not run_remote_checker(spoof_proc, lat, lon, duration, ip_addr): # location could not be matched by ublox module pass diff --git a/tools/gpstest/remote_checker.py b/tools/gpstest/remote_checker.py index 84f6c0c3d9..a649a105c3 100644 --- a/tools/gpstest/remote_checker.py +++ b/tools/gpstest/remote_checker.py @@ -3,6 +3,7 @@ import sys import time from typing import List +from common.params import Params import cereal.messaging as messaging from selfdrive.manager.process_config import managed_processes @@ -12,30 +13,35 @@ procs: List[str] = []#"ubloxd", "pigeond"] def main(): - if len(sys.argv) < 3: + if len(sys.argv) != 4: print("args: ") return - sol_lat = float(sys.argv[1]) - sol_lon = float(sys.argv[2]) + quectel_mod = Params().get_bool("UbloxAvailable") + sol_lat = float(sys.argv[2]) + sol_lon = float(sys.argv[3]) for p in procs: managed_processes[p].start() time.sleep(0.5) # give time to startup - gps_sock = messaging.sub_sock('gpsLocationExternal', timeout=0.1) + socket = 'gpsLocation' if quectel_mod else 'gpsLocationExternal' + gps_sock = messaging.sub_sock(socket, timeout=0.1) # analyze until the location changed while True: events = messaging.drain_sock(gps_sock) for e in events: - lat = e.gpsLocationExternal.latitude - lon = e.gpsLocationExternal.longitude + loc = e.gpsLocation if quectel_mod else e.gpsLocationExternal + lat = loc.latitude + lon = loc.longitude if abs(lat - sol_lat) < DELTA and abs(lon - sol_lon) < DELTA: print("MATCH") return + time.sleep(0.1) + for p in procs: if not managed_processes[p].proc.is_alive(): print(f"ERROR: '{p}' died") diff --git a/tools/gpstest/run_static_gps_signal.py b/tools/gpstest/run_static_gps_signal.py deleted file mode 100755 index 3787038f13..0000000000 --- a/tools/gpstest/run_static_gps_signal.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import random -import datetime as dt -import subprocess as sp -from typing import Tuple - -from laika.downloader import download_nav -from laika.gps_time import GPSTime -from laika.helpers import ConstellationId - -cache_dir = '/tmp/gpstest/' - - -def download_rinex(): - # TODO: check if there is a better way to get the full brdc file for LimeGPS - gps_time = GPSTime.from_datetime(dt.datetime.utcnow()) - utc_time = dt.datetime.utcnow() - dt.timedelta(1) - gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day)) - return download_nav(gps_time, cache_dir, ConstellationId.GPS) - - -def get_random_coords(lat, lon) -> Tuple[int, int]: - # jump around the world - # max values, lat: -90 to 90, lon: -180 to 180 - - lat_add = random.random()*20 + 10 - lon_add = random.random()*20 + 20 - - lat = ((lat + lat_add + 90) % 180) - 90 - lon = ((lon + lon_add + 180) % 360) - 180 - return round(lat, 5), round(lon, 5) - - -def check_availability() -> bool: - cmd = ["LimeSuite/builddir/LimeUtil/LimeUtil", "--find"] - output = sp.check_output(cmd) - - if output.strip() == b"": - return False - - print(f"Device: {output.strip().decode('utf-8')}") - return True - - -def main(): - if not os.path.exists('LimeGPS'): - print("LimeGPS not found run 'setup.sh' first") - return - - if not os.path.exists('LimeSuite'): - print("LimeSuite not found run 'setup.sh' first") - return - - if not check_availability(): - print("No limeSDR device found!") - return - - rinex_file = download_rinex() - lat, lon = get_random_coords(47.2020, 15.7403) - - if len(sys.argv) == 3: - lat = float(sys.argv[1]) - lon = float(sys.argv[2]) - - try: - print(f"starting LimeGPS, Location: {lat},{lon}") - cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},100"] - sp.check_output(cmd, stderr=sp.PIPE) - except KeyboardInterrupt: - print("stopping LimeGPS") - except Exception as e: - out_stderr = e.stderr.decode('utf-8')# pylint:disable=no-member - if "Device is busy." in out_stderr: - print("GPS simulation is already running, Device is busy!") - return - - print(f"LimeGPS crashed: {str(e)}") - print(f"stderr:\n{e.stderr.decode('utf-8')}")# pylint:disable=no-member - -if __name__ == "__main__": - main() diff --git a/tools/gpstest/run_unittest.sh b/tools/gpstest/run_unittest.sh new file mode 100755 index 0000000000..d284fa74e5 --- /dev/null +++ b/tools/gpstest/run_unittest.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# NOTE: can only run inside limeGPS test box! + +# run limeGPS with random static location +timeout 300 ./simulate_gps_signal.py & +gps_PID=$? + +echo "starting limeGPS..." +sleep 10 + +# run unit tests (skipped when module not present) +python -m unittest test_gps.py +python -m unittest test_gps_qcom.py + +kill $gps_PID diff --git a/tools/gpstest/simulate_gps_signal.py b/tools/gpstest/simulate_gps_signal.py new file mode 100755 index 0000000000..f1e5ad2028 --- /dev/null +++ b/tools/gpstest/simulate_gps_signal.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +import os +import random +import argparse +import datetime as dt +import subprocess as sp +from typing import Tuple + +from laika.downloader import download_nav +from laika.gps_time import GPSTime +from laika.helpers import ConstellationId + +cache_dir = '/tmp/gpstest/' + + +def download_rinex(): + # TODO: check if there is a better way to get the full brdc file for LimeGPS + gps_time = GPSTime.from_datetime(dt.datetime.utcnow()) + utc_time = dt.datetime.utcnow() - dt.timedelta(1) + gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day)) + return download_nav(gps_time, cache_dir, ConstellationId.GPS) + +def get_coords(lat, lon, s1, s2, o1=0, o2=0) -> Tuple[int, int]: + lat_add = random.random()*s1 + o1 + lon_add = random.random()*s2 + o2 + + lat = ((lat + lat_add + 90) % 180) - 90 + lon = ((lon + lon_add + 180) % 360) - 180 + return round(lat, 5), round(lon, 5) + +def get_continuous_coords(lat, lon) -> Tuple[int, int]: + # continuously move around the world + return get_coords(lat, lon, 0.01, 0.01) + +def get_random_coords(lat, lon) -> Tuple[int, int]: + # jump around the world + return get_coords(lat, lon, 20, 20, 10, 20) + +def check_availability() -> bool: + cmd = ["LimeSuite/builddir/LimeUtil/LimeUtil", "--find"] + output = sp.check_output(cmd) + + if output.strip() == b"": + return False + + print(f"Device: {output.strip().decode('utf-8')}") + return True + +def main(lat, lon, jump_sim, contin_sim): + if not os.path.exists('LimeGPS'): + print("LimeGPS not found run 'setup.sh' first") + return + + if not os.path.exists('LimeSuite'): + print("LimeSuite not found run 'setup.sh' first") + return + + if not check_availability(): + print("No limeSDR device found!") + return + + rinex_file = download_rinex() + + if lat == 0 and lon == 0: + lat, lon = get_random_coords(47.2020, 15.7403) + + timeout = None + if jump_sim: + timeout = 30 + + while True: + try: + print(f"starting LimeGPS, Location: {lat},{lon}") + cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},100"] + sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout) + except KeyboardInterrupt: + print("stopping LimeGPS") + return + except sp.TimeoutExpired: + print("LimeGPS timeout reached!") + except Exception as e: + out_stderr = e.stderr.decode('utf-8')# pylint:disable=no-member + if "Device is busy." in out_stderr: + print("GPS simulation is already running, Device is busy!") + return + + print(f"LimeGPS crashed: {str(e)}") + print(f"stderr:\n{e.stderr.decode('utf-8')}")# pylint:disable=no-member + + if contin_sim: + lat, lon = get_continuous_coords(lat, lon) + else: + lat, lon = get_random_coords(lat, lon) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simulate static [or random jumping] GPS signal.") + parser.add_argument("lat", type=float, nargs='?', default=0) + parser.add_argument("lon", type=float, nargs='?', default=0) + parser.add_argument("--jump", action="store_true", help="signal that jumps around the world") + parser.add_argument("--contin", action="store_true", help="continuously/slowly moving around the world") + args = parser.parse_args() + main(args.lat, args.lon, args.jump, args.contin) diff --git a/tools/gpstest/test_gps.py b/tools/gpstest/test_gps.py index f5e19372f7..b5d0cdd254 100644 --- a/tools/gpstest/test_gps.py +++ b/tools/gpstest/test_gps.py @@ -2,8 +2,8 @@ import time import unittest import struct -import numpy as np +from common.params import Params import cereal.messaging as messaging import selfdrive.sensord.pigeond as pd from system.hardware import TICI @@ -22,7 +22,20 @@ def read_events(service, duration_sec): return events -def verify_ubloxgnss_data(socket: messaging.SubSocket): +def create_backup(pigeon): + # controlled GNSS stop + pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74") + + # store almanac in flash + pigeon.send(b"\xB5\x62\x09\x14\x04\x00\x00\x00\x00\x00\x21\xEC") + try: + if not pigeon.wait_for_ack(ack=pd.UBLOX_SOS_ACK, nack=pd.UBLOX_SOS_NACK): + assert False, "Could not store almanac" + except TimeoutError: + pass + + +def verify_ubloxgnss_data(socket: messaging.SubSocket, max_time: int): start_time = 0 end_time = 0 events = messaging.drain_sock(socket) @@ -42,7 +55,7 @@ def verify_ubloxgnss_data(socket: messaging.SubSocket): assert end_time != 0, "no ublox measurements received!" ttfm = (end_time - start_time)/1e9 - assert ttfm < 35, f"Time to first measurement > 35s, {ttfm}" + assert ttfm < max_time, f"Time to first measurement > {max_time}s, {ttfm}" # check for satellite count in measurements sat_count = [] @@ -52,42 +65,31 @@ def verify_ubloxgnss_data(socket: messaging.SubSocket): sat_count.append(event.ubloxGnss.measurementReport.numMeas) num_sat = int(sum(sat_count)/len(sat_count)) - assert num_sat > 8, f"Not enough satellites {num_sat} (TestBox setup!)" + assert num_sat > 5, f"Not enough satellites {num_sat} (TestBox setup!)" -def verify_gps_location(socket: messaging.SubSocket): - buf_lon = [0]*10 - buf_lat = [0]*10 - buf_i = 0 +def verify_gps_location(socket: messaging.SubSocket, max_time: int): events = messaging.drain_sock(socket) assert len(events) != 0, "no gpsLocationExternal measurements" start_time = events[0].logMonoTime end_time = 0 for event in events: - buf_lon[buf_i % 10] = event.gpsLocationExternal.longitude - buf_lat[buf_i % 10] = event.gpsLocationExternal.latitude - buf_i += 1 - - if buf_i < 9: - continue + gps_valid = event.gpsLocationExternal.flags % 2 - if any([lat == 0 or lon == 0 for lat,lon in zip(buf_lat, buf_lon)]): - continue - - if np.std(buf_lon) < 1e-5 and np.std(buf_lat) < 1e-5: + if gps_valid: end_time = event.logMonoTime break assert end_time != 0, "GPS location never converged!" ttfl = (end_time - start_time)/1e9 - assert ttfl < 40, f"Time to first location > 40s, {ttfl}" + assert ttfl < max_time, f"Time to first location > {max_time}s, {ttfl}" hacc = events[-1].gpsLocationExternal.accuracy vacc = events[-1].gpsLocationExternal.verticalAccuracy - assert hacc < 15, f"Horizontal accuracy too high, {hacc}" - assert vacc < 43, f"Vertical accuracy too high, {vacc}" + assert hacc < 20, f"Horizontal accuracy too high, {hacc}" + assert vacc < 45, f"Vertical accuracy too high, {vacc}" def verify_time_to_first_fix(pigeon): @@ -111,11 +113,16 @@ class TestGPS(unittest.TestCase): if not TICI: raise unittest.SkipTest + ublox_available = Params().get_bool("UbloxAvailable") + if not ublox_available: + raise unittest.SkipTest + + def tearDown(self): pd.set_power(False) @with_processes(['ubloxd']) - def test_ublox_reset(self): + def test_a_ublox_reset(self): pigeon, pm = pd.create_pigeon() pd.init_baudrate(pigeon) @@ -127,14 +134,58 @@ class TestGPS(unittest.TestCase): gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) # receive some messages (restart after cold start takes up to 30seconds) - pd.run_receiving(pigeon, pm, 40) + pd.run_receiving(pigeon, pm, 60) + + # store almanac for next test + create_backup(pigeon) - verify_ubloxgnss_data(ugs) - verify_gps_location(gle) + verify_ubloxgnss_data(ugs, 60) + verify_gps_location(gle, 60) # skip for now, this might hang for a while #verify_time_to_first_fix(pigeon) + @with_processes(['ubloxd']) + def test_b_ublox_almanac(self): + pigeon, pm = pd.create_pigeon() + pd.init_baudrate(pigeon) + + # device cold start + pigeon.send(b"\xb5\x62\x06\x04\x04\x00\xff\xff\x00\x00\x0c\x5d") + time.sleep(1) # wait for cold start + pd.init_baudrate(pigeon) + + # clear configuration + pigeon.send_with_ack(b"\xb5\x62\x06\x09\x0d\x00\x00\x00\x1f\x1f\x00\x00\x00\x00\x00\x00\x00\x00\x17\x71\x5b") + + # restoring almanac backup + pigeon.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") + status = pigeon.wait_for_backup_restore_status() + assert status == 2, "Could not restore almanac backup" + + pd.initialize_pigeon(pigeon) + + ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) + gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) + + pd.run_receiving(pigeon, pm, 15) + verify_ubloxgnss_data(ugs, 15) + verify_gps_location(gle, 20) + + + @with_processes(['ubloxd']) + def test_c_ublox_startup(self): + pigeon, pm = pd.create_pigeon() + pd.init_baudrate(pigeon) + pd.initialize_pigeon(pigeon) + + ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) + gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) + pd.run_receiving(pigeon, pm, 10) + verify_ubloxgnss_data(ugs, 10) + verify_gps_location(gle, 10) + + if __name__ == "__main__": unittest.main() \ No newline at end of file diff --git a/tools/gpstest/test_gps_qcom.py b/tools/gpstest/test_gps_qcom.py new file mode 100644 index 0000000000..0909316c5e --- /dev/null +++ b/tools/gpstest/test_gps_qcom.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +import time +import unittest +import subprocess as sp + +from common.params import Params +from system.hardware import TICI +import cereal.messaging as messaging +from selfdrive.manager.process_config import managed_processes + + +def exec_mmcli(cmd): + cmd = "mmcli -m 0 " + cmd + p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE) + return p.communicate() + + +def wait_for_location(socket, timeout): + while True: + events = messaging.drain_sock(socket) + for event in events: + if event.gpsLocation.flags % 2: + return False + + timeout -= 1 + if timeout <= 0: + return True + + time.sleep(0.1) + continue + + +class TestGPS(unittest.TestCase): + @classmethod + def setUpClass(cls): + if not TICI: + raise unittest.SkipTest + + ublox_available = Params().get_bool("UbloxAvailable") + if ublox_available: + raise unittest.SkipTest + + @unittest.skip("Skip cold start test due to time") + def test_quectel_cold_start(self): + # delete assistance data to enforce cold start for GNSS + # testing shows that this takes up to 20min + + # invalidate supl setting, cannot be reset + _, err = exec_mmcli("--location-set-supl-server=unittest:1") + + _, err = exec_mmcli("--command='AT+QGPSDEL=0'") + assert len(err) == 0, f"GPSDEL failed: {err}" + + managed_processes['rawgpsd'].start() + start_time = time.monotonic() + glo = messaging.sub_sock("gpsLocation", timeout=0.1) + + timeout = 10*60*25 # 25 minute + timedout = wait_for_location(glo, timeout) + managed_processes['rawgpsd'].stop() + + assert timedout is False, "Waiting for location timed out (25min)!" + + duration = time.monotonic() - start_time + assert duration < 50, f"Received GPS location {duration}!" + + + def test_a_quectel_cold_start_AGPS(self): + _, err = exec_mmcli("--command='AT+QGPSDEL=0'") + assert len(err) == 0, f"GPSDEL failed: {err}" + + # setup AGPS + exec_mmcli("--location-set-supl-server=supl.google.com:7276") + + managed_processes['rawgpsd'].start() + start_time = time.monotonic() + glo = messaging.sub_sock("gpsLocation", timeout=0.1) + + timeout = 10*60*3 # 3 minute + timedout = wait_for_location(glo, timeout) + managed_processes['rawgpsd'].stop() + + assert timedout is False, "Waiting for location timed out (3min)!" + + duration = time.monotonic() - start_time + assert duration < 60, f"Received GPS location {duration}!" + + + def test_b_quectel_startup(self): + + # setup AGPS + exec_mmcli("--location-set-supl-server=supl.google.com:7276") + + managed_processes['rawgpsd'].start() + start_time = time.monotonic() + glo = messaging.sub_sock("gpsLocation", timeout=0.1) + + timeout = 10*60*3 # 3 minute + timedout = wait_for_location(glo, timeout) + managed_processes['rawgpsd'].stop() + + assert timedout is False, "Waiting for location timed out (3min)!" + + duration = time.monotonic() - start_time + assert duration < 60, f"Received GPS location {duration}!" + + +if __name__ == "__main__": + unittest.main()