#!/usr/bin/env python3
import argparse
import multiprocessing
import rpyc # pylint: disable=import-error
from collections import defaultdict
from helper import download_rinex , exec_LimeGPS_bin
from helper import get_random_coords , get_continuous_coords
#------------------------------------------------------------------------------
# this script is supposed to run on HOST PC
# limeSDR is unreliable via c3 USB
#------------------------------------------------------------------------------
def run_lime_gps ( rinex_file : str , location : str , timeout : int ) :
# needs to run longer than the checker
timeout + = 10
print ( f " LimeGPS { location } { timeout } " )
p = multiprocessing . Process ( target = exec_LimeGPS_bin ,
args = ( rinex_file , location , timeout ) )
p . start ( )
return p
con = None
def run_remote_checker ( lat , lon , alt , duration , ip_addr ) :
global con
try :
con = rpyc . connect ( ip_addr , 18861 )
con . _config [ ' sync_request_timeout ' ] = duration + 20
except ConnectionRefusedError :
print ( " could not run remote checker is ' rpc_server.py ' running??? " )
return False , None , None
matched , log , info = con . root . exposed_run_checker ( lat , lon , alt ,
timeout = duration ,
use_laikad = True )
con . close ( ) # TODO: might wanna fetch more logs here
con = None
print ( f " Remote Checker: { log } { info } " )
return matched , log , info
stats = defaultdict ( int ) # type: ignore
keys = [ ' success ' , ' failed ' , ' ublox_fail ' , ' laikad_fail ' , ' proc_crash ' , ' checker_crash ' ]
def print_report ( ) :
print ( " \n Fuzzy testing report summary: " )
for k in keys :
print ( f " { k } : { stats [ k ] } " )
def update_stats ( matched , log , info ) :
if matched :
stats [ ' success ' ] + = 1
return
stats [ ' failed ' ] + = 1
if log == " PROC CRASH " :
stats [ ' proc_crash ' ] + = 1
if log == " CHECKER CRASHED " :
stats [ ' checker_crash ' ] + = 1
if log == " TIMEOUT " :
if " LAIKAD " in info :
stats [ ' laikad_fail ' ] + = 1
else : # "UBLOX" in info
stats [ ' ublox_fail ' ] + = 1
def main ( ip_addr , continuous_mode , timeout , pos ) :
rinex_file = download_rinex ( )
lat , lon , alt = pos
if lat == 0 and lon == 0 and alt == 0 :
lat , lon , alt = get_random_coords ( 47.2020 , 15.7403 )
try :
while True :
# spoof random location
spoof_proc = run_lime_gps ( rinex_file , f " { lat } , { lon } , { alt } " , timeout )
# remote checker execs blocking
matched , log , info = run_remote_checker ( lat , lon , alt , timeout , ip_addr )
update_stats ( matched , log , info )
spoof_proc . terminate ( )
spoof_proc = None
if continuous_mode :
lat , lon , alt = get_continuous_coords ( lat , lon , alt )
else :
lat , lon , alt = get_random_coords ( lat , lon )
except KeyboardInterrupt :
if spoof_proc is not None :
spoof_proc . terminate ( )
if con is not None and not con . closed :
con . root . exposed_kill_procs ( )
con . close ( )
print_report ( )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " Fuzzy test GPS stack with random locations. " )
parser . add_argument ( " ip_addr " , type = str )
parser . add_argument ( " -c " , " --contin " , type = bool , nargs = ' ? ' , default = False , help = ' Continous location change ' )
parser . add_argument ( " -t " , " --timeout " , type = int , nargs = ' ? ' , default = 180 , help = ' Timeout to get location ' )
# for replaying a location
parser . add_argument ( " lat " , type = float , nargs = ' ? ' , default = 0 )
parser . add_argument ( " lon " , type = float , nargs = ' ? ' , default = 0 )
parser . add_argument ( " alt " , type = float , nargs = ' ? ' , default = 0 )
args = parser . parse_args ( )
main ( args . ip_addr , args . contin , args . timeout , ( args . lat , args . lon , args . alt ) )