You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							115 lines
						
					
					
						
							3.5 KiB
						
					
					
				
			
		
		
	
	
							115 lines
						
					
					
						
							3.5 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import argparse
 | 
						|
import multiprocessing
 | 
						|
import rpyc
 | 
						|
from collections import defaultdict
 | 
						|
 | 
						|
from helper import download_rinex, exec_LimeGPS_bin
 | 
						|
from helper import get_random_coords, get_continuous_coords
 | 
						|
 | 
						|
#------------------------------------------------------------------------------
 | 
						|
# this script is supposed to run on HOST PC
 | 
						|
# limeSDR is unreliable via c3 USB
 | 
						|
#------------------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
def run_lime_gps(rinex_file: str, location: str, timeout: int):
 | 
						|
  # needs to run longer than the checker
 | 
						|
  timeout += 10
 | 
						|
  print(f"LimeGPS {location} {timeout}")
 | 
						|
  p = multiprocessing.Process(target=exec_LimeGPS_bin,
 | 
						|
                              args=(rinex_file, location, timeout))
 | 
						|
  p.start()
 | 
						|
  return p
 | 
						|
 | 
						|
con = None
 | 
						|
def run_remote_checker(lat, lon, alt, duration, ip_addr):
 | 
						|
  global con
 | 
						|
  try:
 | 
						|
    con = rpyc.connect(ip_addr, 18861)
 | 
						|
    con._config['sync_request_timeout'] = duration+20
 | 
						|
  except ConnectionRefusedError:
 | 
						|
    print("could not run remote checker is 'rpc_server.py' running???")
 | 
						|
    return False, None, None
 | 
						|
 | 
						|
  matched, log, info = con.root.exposed_run_checker(lat, lon, alt,
 | 
						|
                        timeout=duration,
 | 
						|
                        use_laikad=True)
 | 
						|
  con.close() # TODO: might wanna fetch more logs here
 | 
						|
  con = None
 | 
						|
 | 
						|
  print(f"Remote Checker: {log} {info}")
 | 
						|
  return matched, log, info
 | 
						|
 | 
						|
 | 
						|
stats = defaultdict(int) # type: ignore
 | 
						|
keys = ['success', 'failed', 'ublox_fail', 'laikad_fail', 'proc_crash', 'checker_crash']
 | 
						|
 | 
						|
def print_report():
 | 
						|
  print("\nFuzzy testing report summary:")
 | 
						|
  for k in keys:
 | 
						|
    print(f"  {k}: {stats[k]}")
 | 
						|
 | 
						|
 | 
						|
def update_stats(matched, log, info):
 | 
						|
  if matched:
 | 
						|
    stats['success'] += 1
 | 
						|
    return
 | 
						|
 | 
						|
  stats['failed'] += 1
 | 
						|
  if log == "PROC CRASH":
 | 
						|
    stats['proc_crash'] += 1
 | 
						|
  if log == "CHECKER CRASHED":
 | 
						|
    stats['checker_crash'] += 1
 | 
						|
  if log == "TIMEOUT":
 | 
						|
    if "LAIKAD" in info:
 | 
						|
      stats['laikad_fail'] += 1
 | 
						|
    else: # "UBLOX" in info
 | 
						|
      stats['ublox_fail'] += 1
 | 
						|
 | 
						|
 | 
						|
def main(ip_addr, continuous_mode, timeout, pos):
 | 
						|
  rinex_file = download_rinex()
 | 
						|
 | 
						|
  lat, lon, alt = pos
 | 
						|
  if lat == 0 and lon == 0 and alt == 0:
 | 
						|
    lat, lon, alt = get_random_coords(47.2020, 15.7403)
 | 
						|
 | 
						|
  try:
 | 
						|
    while True:
 | 
						|
      # spoof random location
 | 
						|
      spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},{alt}", timeout)
 | 
						|
 | 
						|
      # remote checker execs blocking
 | 
						|
      matched, log, info = run_remote_checker(lat, lon, alt, timeout, ip_addr)
 | 
						|
      update_stats(matched, log, info)
 | 
						|
      spoof_proc.terminate()
 | 
						|
      spoof_proc = None
 | 
						|
 | 
						|
      if continuous_mode:
 | 
						|
        lat, lon, alt = get_continuous_coords(lat, lon, alt)
 | 
						|
      else:
 | 
						|
        lat, lon, alt = get_random_coords(lat, lon)
 | 
						|
  except KeyboardInterrupt:
 | 
						|
    if spoof_proc is not None:
 | 
						|
      spoof_proc.terminate()
 | 
						|
 | 
						|
    if con is not None and not con.closed:
 | 
						|
      con.root.exposed_kill_procs()
 | 
						|
      con.close()
 | 
						|
 | 
						|
    print_report()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  parser = argparse.ArgumentParser(description="Fuzzy test GPS stack with random locations.")
 | 
						|
  parser.add_argument("ip_addr", type=str)
 | 
						|
  parser.add_argument("-c", "--contin", type=bool, nargs='?', default=False, help='Continous location change')
 | 
						|
  parser.add_argument("-t", "--timeout", type=int, nargs='?', default=180, help='Timeout to get location')
 | 
						|
 | 
						|
  # for replaying a location
 | 
						|
  parser.add_argument("lat", type=float, nargs='?', default=0)
 | 
						|
  parser.add_argument("lon", type=float, nargs='?', default=0)
 | 
						|
  parser.add_argument("alt", type=float, nargs='?', default=0)
 | 
						|
  args = parser.parse_args()
 | 
						|
  main(args.ip_addr, args.contin, args.timeout, (args.lat, args.lon, args.alt))
 | 
						|
 |