Improve GPSbox tests (#26983)

* add altitude testing

* fix proc check

Co-authored-by: Kurt Nistelberger <kurt.nistelberger@gmail.com>
old-commit-hash: a43a243b6e
beeps
Kurt Nistelberger 2 years ago committed by GitHub
parent f1f5f15f12
commit 8d5e71eb8e
  1. 7
      tools/gpstest/README.md
  2. 204
      tools/gpstest/fuzzy_testing.py
  3. 53
      tools/gpstest/helper.py
  4. 54
      tools/gpstest/remote_checker.py
  5. 185
      tools/gpstest/rpc_server.py
  6. 25
      tools/gpstest/simulate_gps_signal.py

@ -3,12 +3,9 @@ Testing the GPS receiver using GPS spoofing. At the moment only
static location relpay is supported.
# Usage
```
# on host, start gps signal simulation
./run_static_lime.py
```
on C3 run `rpc_server.py`, on host PC run `fuzzy_testing.py`
`run_static_lime.py` downloads the latest ephemeris file from
`simulate_gps_signal.py` downloads the latest ephemeris file from
https://cddis.nasa.gov/archive/gnss/data/daily/20xx/brdc/.

@ -1,147 +1,115 @@
#!/usr/bin/env python3
import sys
import time
import random
import datetime as dt
import subprocess as sp
import argparse
import multiprocessing
import threading
from typing import Tuple, Any
import rpyc # pylint: disable=import-error
from collections import defaultdict
from laika.downloader import download_nav
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
from helper import download_rinex, exec_LimeGPS_bin
from helper import get_random_coords, get_continuous_coords
cache_dir = '/tmp/gpstest/'
#------------------------------------------------------------------------------
# this script is supposed to run on HOST PC
# limeSDR is unreliable via c3 USB
#------------------------------------------------------------------------------
def download_rinex():
# TODO: check if there is a better way to get the full brdc file for LimeGPS
gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
utc_time = dt.datetime.utcnow() - dt.timedelta(1)
gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
return download_nav(gps_time, cache_dir, ConstellationId.GPS)
def exec_LimeGPS_bin(rinex_file: str, location: str, duration: int):
# this functions should never return, cause return means, timeout is
# reached or it crashed
try:
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", location]
sp.check_output(cmd, timeout=duration)
except sp.TimeoutExpired:
print("LimeGPS timeout reached!")
except Exception as e:
print(f"LimeGPS crashed: {str(e)}")
def run_lime_gps(rinex_file: str, location: str, duration: int):
print(f"LimeGPS {location} {duration}")
def run_lime_gps(rinex_file: str, location: str, timeout: int):
# needs to run longer than the checker
timeout += 10
print(f"LimeGPS {location} {timeout}")
p = multiprocessing.Process(target=exec_LimeGPS_bin,
args=(rinex_file, location, duration))
args=(rinex_file, location, timeout))
p.start()
return p
con = None
def run_remote_checker(lat, lon, alt, duration, ip_addr):
global con
try:
con = rpyc.connect(ip_addr, 18861)
con._config['sync_request_timeout'] = duration+20
except ConnectionRefusedError:
print("could not run remote checker is 'rpc_server.py' running???")
return False, None, None
def get_random_coords(lat, lon) -> Tuple[int, int]:
# jump around the world
# max values, lat: -90 to 90, lon: -180 to 180
lat_add = random.random()*20 + 10
lon_add = random.random()*20 + 20
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5)
def get_continuous_coords(lat, lon) -> Tuple[int, int]:
# continuously move around the world
lat_add = random.random()*0.01
lon_add = random.random()*0.01
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5)
rc_p: Any = None
def exec_remote_checker(lat, lon, duration, ip_addr):
global rc_p
# TODO: good enough for testing
remote_cmd = "export PYTHONPATH=/data/pythonpath && "
remote_cmd += "cd /data/openpilot && "
remote_cmd += f"timeout {duration} /usr/local/pyenv/shims/python tools/gpstest/remote_checker.py "
remote_cmd += f"{lat} {lon}"
ssh_cmd = ["ssh", "-i", "/home/batman/openpilot/xx/phone/key/id_rsa",
f"comma@{ip_addr}"]
ssh_cmd += [remote_cmd]
rc_p = sp.Popen(ssh_cmd, stdout=sp.PIPE)
rc_p.wait()
rc_output = rc_p.stdout.read()
print(f"Checker Result: {rc_output.strip().decode('utf-8')}")
matched, log, info = con.root.exposed_run_checker(lat, lon, alt,
timeout=duration,
use_laikad=True)
con.close() # TODO: might wanna fetch more logs here
con = None
print(f"Remote Checker: {log} {info}")
return matched, log, info
def run_remote_checker(spoof_proc, lat, lon, duration, ip_addr) -> bool:
checker_thread = threading.Thread(target=exec_remote_checker,
args=(lat, lon, duration, ip_addr))
checker_thread.start()
tcnt = 0
while True:
if not checker_thread.is_alive():
# assume this only happens when the signal got matched
return True
stats = defaultdict(int) # type: ignore
keys = ['success', 'failed', 'ublox_fail', 'laikad_fail', 'proc_crash', 'checker_crash']
# the spoofing process has a timeout, kill checker if reached
if not spoof_proc.is_alive():
rc_p.kill()
# spoofing process died, assume timeout
print("Spoofing process timeout")
return False
def print_report():
print("\nFuzzy testing report summary:")
for k in keys:
print(f" {k}: {stats[k]}")
print(f"Time elapsed: {tcnt}[s]", end = "\r")
time.sleep(1)
tcnt += 1
def update_stats(matched, log, info):
if matched:
stats['success'] += 1
return
def main():
if len(sys.argv) < 2:
print(f"usage: {sys.argv[0]} <ip_addr> [-c]")
ip_addr = sys.argv[1]
stats['failed'] += 1
if log == "PROC CRASH":
stats['proc_crash'] += 1
if log == "CHECKER CRASHED":
stats['checker_crash'] += 1
if log == "TIMEOUT":
if "LAIKAD" in info:
stats['laikad_fail'] += 1
else: # "UBLOX" in info
stats['ublox_fail'] += 1
continuous_mode = False
if len(sys.argv) == 3 and sys.argv[2] == '-c':
print("Continuous Mode!")
continuous_mode = True
def main(ip_addr, continuous_mode, timeout, pos):
rinex_file = download_rinex()
duration = 60*3 # max runtime in seconds
lat, lon = get_random_coords(47.2020, 15.7403)
lat, lon, alt = pos
if lat == 0 and lon == 0 and alt == 0:
lat, lon, alt = get_random_coords(47.2020, 15.7403)
try:
while True:
# spoof random location
spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},{alt}", timeout)
while True:
# spoof random location
spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},100", duration)
start_time = time.monotonic()
# remote checker execs blocking
matched, log, info = run_remote_checker(lat, lon, alt, timeout, ip_addr)
update_stats(matched, log, info)
spoof_proc.terminate()
spoof_proc = None
# remote checker runs blocking
if not run_remote_checker(spoof_proc, lat, lon, duration, ip_addr):
# location could not be matched by ublox module
pass
if continuous_mode:
lat, lon, alt = get_continuous_coords(lat, lon, alt)
else:
lat, lon, alt = get_random_coords(lat, lon)
except KeyboardInterrupt:
if spoof_proc is not None:
spoof_proc.terminate()
end_time = time.monotonic()
spoof_proc.terminate()
if con is not None and not con.closed:
con.root.exposed_kill_procs()
con.close()
# -1 to count process startup
print(f"Time to get Signal: {round(end_time - start_time - 1, 4)}")
print_report()
if continuous_mode:
lat, lon = get_continuous_coords(lat, lon)
else:
lat, lon = get_random_coords(lat, lon)
if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(description="Fuzzy test GPS stack with random locations.")
parser.add_argument("ip_addr", type=str)
parser.add_argument("-c", "--contin", type=bool, nargs='?', default=False, help='Continous location change')
parser.add_argument("-t", "--timeout", type=int, nargs='?', default=180, help='Timeout to get location')
# for replaying a location
parser.add_argument("lat", type=float, nargs='?', default=0)
parser.add_argument("lon", type=float, nargs='?', default=0)
parser.add_argument("alt", type=float, nargs='?', default=0)
args = parser.parse_args()
main(args.ip_addr, args.contin, args.timeout, (args.lat, args.lon, args.alt))

@ -0,0 +1,53 @@
import random
import datetime as dt
import subprocess as sp
from typing import Tuple
from laika.downloader import download_nav
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
def download_rinex():
# TODO: check if there is a better way to get the full brdc file for LimeGPS
gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
utc_time = dt.datetime.utcnow() - dt.timedelta(1)
gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
return download_nav(gps_time, '/tmp/gpstest/', ConstellationId.GPS)
def exec_LimeGPS_bin(rinex_file: str, location: str, duration: int):
# this functions should never return, cause return means, timeout is
# reached or it crashed
try:
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", location]
sp.check_output(cmd, timeout=duration)
except sp.TimeoutExpired:
print("LimeGPS timeout reached!")
except Exception as e:
print(f"LimeGPS crashed: {str(e)}")
def get_random_coords(lat, lon) -> Tuple[float, float, int]:
# jump around the world
# max values, lat: -90 to 90, lon: -180 to 180
lat_add = random.random()*20 + 10
lon_add = random.random()*20 + 20
alt = random.randint(-10**3, 4*10**3)
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5), alt
def get_continuous_coords(lat, lon, alt) -> Tuple[float, float, int]:
# continuously move around the world
lat_add = random.random()*0.01
lon_add = random.random()*0.01
alt_add = random.randint(-100, 100)
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
alt += alt_add
return round(lat, 5), round(lon, 5), alt

@ -1,54 +0,0 @@
#!/usr/bin/env python3
import sys
import time
from typing import List
from common.params import Params
import cereal.messaging as messaging
from selfdrive.manager.process_config import managed_processes
DELTA = 0.001
# assume running openpilot for now
procs: List[str] = []#"ubloxd", "pigeond"]
def main():
if len(sys.argv) != 4:
print("args: <latitude> <longitude>")
return
quectel_mod = Params().get_bool("UbloxAvailable")
sol_lat = float(sys.argv[2])
sol_lon = float(sys.argv[3])
for p in procs:
managed_processes[p].start()
time.sleep(0.5) # give time to startup
socket = 'gpsLocation' if quectel_mod else 'gpsLocationExternal'
gps_sock = messaging.sub_sock(socket, timeout=0.1)
# analyze until the location changed
while True:
events = messaging.drain_sock(gps_sock)
for e in events:
loc = e.gpsLocation if quectel_mod else e.gpsLocationExternal
lat = loc.latitude
lon = loc.longitude
if abs(lat - sol_lat) < DELTA and abs(lon - sol_lon) < DELTA:
print("MATCH")
return
time.sleep(0.1)
for p in procs:
if not managed_processes[p].proc.is_alive():
print(f"ERROR: '{p}' died")
return
if __name__ == "__main__":
main()
for p in procs:
managed_processes[p].stop()

@ -0,0 +1,185 @@
import os
import time
import shutil
from datetime import datetime
from collections import defaultdict
import rpyc # pylint: disable=import-error
from rpyc.utils.server import ThreadedServer # pylint: disable=import-error
#from common.params import Params
import cereal.messaging as messaging
from selfdrive.manager.process_config import managed_processes
from laika.lib.coordinates import ecef2geodetic
DELTA = 0.001
ALT_DELTA = 30
MATCH_NUM = 10
REPORT_STATS = 10
EPHEM_CACHE = "/data/params/d/LaikadEphemeris"
DOWNLOAD_CACHE = "/tmp/comma_download_cache"
SERVER_LOG_FILE = "/tmp/fuzzy_server.log"
server_log = open(SERVER_LOG_FILE, "w+")
def slog(msg):
server_log.write(f"{datetime.now().strftime('%H:%M:%S.%f')} | {msg}\n")
server_log.flush()
def handle_laikad(msg):
if not hasattr(msg, 'correctedMeasurements'):
return None
num_corr = len(msg.correctedMeasurements)
pos_ecef = msg.positionECEF.value
pos_geo = []
if len(pos_ecef) > 0:
pos_geo = ecef2geodetic(pos_ecef)
pos_std = msg.positionECEF.std
pos_valid = msg.positionECEF.valid
slog(f"{num_corr} {pos_geo} {pos_ecef} {pos_std} {pos_valid}")
return pos_geo, (num_corr, pos_geo, list(pos_ecef), list(msg.positionECEF.std))
hw_msgs = 0
ephem_msgs: dict = defaultdict(int)
def handle_ublox(msg):
global hw_msgs
d = msg.to_dict()
if 'hwStatus2' in d:
hw_msgs += 1
if 'ephemeris' in d:
ephem_msgs[msg.ephemeris.svId] += 1
num_meas = None
if 'measurementReport' in d:
num_meas = msg.measurementReport.numMeas
return [hw_msgs, ephem_msgs, num_meas]
def start_procs(procs):
for p in procs:
managed_processes[p].start()
time.sleep(1)
def kill_procs(procs, no_retry=False):
for p in procs:
managed_processes[p].stop()
time.sleep(1)
if not no_retry:
for p in procs:
mp = managed_processes[p].proc
if mp is not None and mp.is_alive():
managed_processes[p].stop()
time.sleep(3)
def check_alive_procs(procs):
for p in procs:
mp = managed_processes[p].proc
if mp is None or not mp.is_alive():
return False, p
return True, None
class RemoteCheckerService(rpyc.Service):
def on_connect(self, conn):
pass
def on_disconnect(self, conn):
#kill_procs(self.procs, no_retry=False)
# this execution is delayed, it will kill the next run of laikad
# TODO: add polling to wait for everything is killed
pass
def run_checker(self, slat, slon, salt, sockets, procs, timeout):
global hw_msgs, ephem_msgs
hw_msgs = 0
ephem_msgs = defaultdict(int)
slog(f"Run test: {slat} {slon} {salt}")
# quectel_mod = Params().get_bool("UbloxAvailable")
match_cnt = 0
msg_cnt = 0
stats_laikad = []
stats_ublox = []
self.procs = procs
start_procs(procs)
sm = messaging.SubMaster(sockets)
start_time = time.monotonic()
while True:
sm.update()
if sm.updated['ubloxGnss']:
stats_ublox.append(handle_ublox(sm['ubloxGnss']))
if sm.updated['gnssMeasurements']:
pos_geo, stats = handle_laikad(sm['gnssMeasurements'])
if pos_geo is None or len(pos_geo) == 0:
continue
match = all(abs(g-s) < DELTA for g,s in zip(pos_geo[:2], [slat, slon]))
match &= abs(pos_geo[2] - salt) < ALT_DELTA
if match:
match_cnt += 1
if match_cnt >= MATCH_NUM:
return True, "MATCH", f"After: {round(time.monotonic() - start_time, 4)}"
# keep some stats for error reporting
stats_laikad.append(stats)
if (msg_cnt % 10) == 0:
a, p = check_alive_procs(procs)
if not a:
return False, "PROC CRASH", f"{p}"
msg_cnt += 1
if (time.monotonic() - start_time) > timeout:
h = f"LAIKAD: {stats_laikad[-REPORT_STATS:]}"
if len(h) == 0:
h = f"UBLOX: {stats_ublox[-REPORT_STATS:]}"
return False, "TIMEOUT", h
def exposed_run_checker(self, slat, slon, salt, timeout=180, use_laikad=True):
try:
procs = []
sockets = []
if use_laikad:
procs.append("laikad") # pigeond, ubloxd # might wanna keep them running
sockets += ['ubloxGnss', 'gnssMeasurements']
if os.path.exists(EPHEM_CACHE):
os.remove(EPHEM_CACHE)
shutil.rmtree(DOWNLOAD_CACHE, ignore_errors=True)
ret = self.run_checker(slat, slon, salt, sockets, procs, timeout)
kill_procs(procs)
return ret
except Exception as e:
# always make sure processes get killed
kill_procs(procs)
return False, "CHECKER CRASHED", f"{str(e)}"
def exposed_kill_procs(self):
kill_procs(self.procs, no_retry=True)
if __name__ == "__main__":
print(f"Sever Log written to: {SERVER_LOG_FILE}")
t = ThreadedServer(RemoteCheckerService, port=18861)
t.start()

@ -16,7 +16,7 @@ cache_dir = '/tmp/gpstest/'
def download_rinex():
# TODO: check if there is a better way to get the full brdc file for LimeGPS
gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
utc_time = dt.datetime.utcnow() - dt.timedelta(1)
utc_time = dt.datetime.utcnow()# - dt.timedelta(1)
gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
return download_nav(gps_time, cache_dir, ConstellationId.GPS)
@ -36,11 +36,15 @@ def get_random_coords(lat, lon) -> Tuple[int, int]:
# jump around the world
return get_coords(lat, lon, 20, 20, 10, 20)
def run_limeSDR_loop(lat, lon, contin_sim, rinex_file, timeout):
def run_limeSDR_loop(lat, lon, alt, contin_sim, rinex_file, timeout):
while True:
try:
print(f"starting LimeGPS, Location: {lat},{lon}")
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},100"]
# TODO: add starttime setting and altitude
# -t 2023/01/15,00:00:00 -T 2023/01/15,00:00:00
# this needs to match the date of the navigation file
print(f"starting LimeGPS, Location: {lat} {lon} {alt}")
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},{alt}"]
print(f"CMD: {cmd}")
sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout)
except KeyboardInterrupt:
print("stopping LimeGPS")
@ -71,7 +75,7 @@ def run_hackRF_loop(lat, lon, rinex_file, timeout):
try:
print(f"starting gps-sdr-sim, Location: {lat},{lon}")
# create 30second file and replay with hackrf endless
cmd = ["gps-sdr-sim/gps-sdr-sim", "-e", rinex_file, "-l", f"{lat},{lon},100", "-d", "30"]
cmd = ["gps-sdr-sim/gps-sdr-sim", "-e", rinex_file, "-l", f"{lat},{lon},-200", "-d", "30"]
sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout)
# created in current working directory
except Exception:
@ -90,7 +94,7 @@ def run_hackRF_loop(lat, lon, rinex_file, timeout):
print(f"hackrf_transfer crashed:{str(e)}")
def main(lat, lon, jump_sim, contin_sim, hackrf_mode):
def main(lat, lon, alt, jump_sim, contin_sim, hackrf_mode):
if hackrf_mode:
if not os.path.exists('hackrf'):
@ -130,17 +134,18 @@ def main(lat, lon, jump_sim, contin_sim, hackrf_mode):
if jump_sim:
timeout = 30
if not hackrf_mode:
run_limeSDR_loop(lat, lon, contin_sim, rinex_file, timeout)
else:
if hackrf_mode:
run_hackRF_loop(lat, lon, rinex_file, timeout)
else:
run_limeSDR_loop(lat, lon, alt, contin_sim, rinex_file, timeout)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simulate static [or random jumping] GPS signal.")
parser.add_argument("lat", type=float, nargs='?', default=0)
parser.add_argument("lon", type=float, nargs='?', default=0)
parser.add_argument("alt", type=float, nargs='?', default=0)
parser.add_argument("--jump", action="store_true", help="signal that jumps around the world")
parser.add_argument("--contin", action="store_true", help="continuously/slowly moving around the world")
parser.add_argument("--hackrf", action="store_true", help="hackrf mode (DEFAULT: LimeSDR)")
args = parser.parse_args()
main(args.lat, args.lon, args.jump, args.contin, args.hackrf)
main(args.lat, args.lon, args.alt, args.jump, args.contin, args.hackrf)

Loading…
Cancel
Save