openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

115 lines
3.6 KiB

#!/usr/bin/env python3
import argparse
import multiprocessing
import rpyc # pylint: disable=import-error
from collections import defaultdict
from helper import download_rinex, exec_LimeGPS_bin
from helper import get_random_coords, get_continuous_coords
#------------------------------------------------------------------------------
# this script is supposed to run on HOST PC
# limeSDR is unreliable via c3 USB
#------------------------------------------------------------------------------
def run_lime_gps(rinex_file: str, location: str, timeout: int):
# needs to run longer than the checker
timeout += 10
print(f"LimeGPS {location} {timeout}")
p = multiprocessing.Process(target=exec_LimeGPS_bin,
args=(rinex_file, location, timeout))
p.start()
return p
con = None
def run_remote_checker(lat, lon, alt, duration, ip_addr):
global con
try:
con = rpyc.connect(ip_addr, 18861)
con._config['sync_request_timeout'] = duration+20
except ConnectionRefusedError:
print("could not run remote checker is 'rpc_server.py' running???")
return False, None, None
matched, log, info = con.root.exposed_run_checker(lat, lon, alt,
timeout=duration,
use_laikad=True)
con.close() # TODO: might wanna fetch more logs here
con = None
print(f"Remote Checker: {log} {info}")
return matched, log, info
stats = defaultdict(int) # type: ignore
keys = ['success', 'failed', 'ublox_fail', 'laikad_fail', 'proc_crash', 'checker_crash']
def print_report():
print("\nFuzzy testing report summary:")
for k in keys:
print(f" {k}: {stats[k]}")
def update_stats(matched, log, info):
if matched:
stats['success'] += 1
return
stats['failed'] += 1
if log == "PROC CRASH":
stats['proc_crash'] += 1
if log == "CHECKER CRASHED":
stats['checker_crash'] += 1
if log == "TIMEOUT":
if "LAIKAD" in info:
stats['laikad_fail'] += 1
else: # "UBLOX" in info
stats['ublox_fail'] += 1
def main(ip_addr, continuous_mode, timeout, pos):
rinex_file = download_rinex()
lat, lon, alt = pos
if lat == 0 and lon == 0 and alt == 0:
lat, lon, alt = get_random_coords(47.2020, 15.7403)
try:
while True:
# spoof random location
spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},{alt}", timeout)
# remote checker execs blocking
matched, log, info = run_remote_checker(lat, lon, alt, timeout, ip_addr)
update_stats(matched, log, info)
spoof_proc.terminate()
spoof_proc = None
if continuous_mode:
lat, lon, alt = get_continuous_coords(lat, lon, alt)
else:
lat, lon, alt = get_random_coords(lat, lon)
except KeyboardInterrupt:
if spoof_proc is not None:
spoof_proc.terminate()
if con is not None and not con.closed:
con.root.exposed_kill_procs()
con.close()
print_report()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fuzzy test GPS stack with random locations.")
parser.add_argument("ip_addr", type=str)
parser.add_argument("-c", "--contin", type=bool, nargs='?', default=False, help='Continous location change')
parser.add_argument("-t", "--timeout", type=int, nargs='?', default=180, help='Timeout to get location')
# for replaying a location
parser.add_argument("lat", type=float, nargs='?', default=0)
parser.add_argument("lon", type=float, nargs='?', default=0)
parser.add_argument("alt", type=float, nargs='?', default=0)
args = parser.parse_args()
main(args.ip_addr, args.contin, args.timeout, (args.lat, args.lon, args.alt))