You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							142 lines
						
					
					
						
							4.1 KiB
						
					
					
				
			
		
		
	
	
							142 lines
						
					
					
						
							4.1 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import sys
 | |
| import time
 | |
| import random
 | |
| import datetime as dt
 | |
| import subprocess as sp
 | |
| import multiprocessing
 | |
| import threading
 | |
| from typing import Tuple, Any
 | |
| 
 | |
| from laika.downloader import download_nav
 | |
| from laika.gps_time import GPSTime
 | |
| from laika.helpers import ConstellationId
 | |
| 
 | |
| cache_dir = '/tmp/gpstest/'
 | |
| 
 | |
| 
 | |
| def download_rinex():
 | |
|   # TODO: check if there is a better way to get the full brdc file for LimeGPS
 | |
|   gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
 | |
|   utc_time = dt.datetime.utcnow() - dt.timedelta(1)
 | |
|   gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
 | |
|   return download_nav(gps_time, cache_dir, ConstellationId.GPS)
 | |
| 
 | |
| 
 | |
| def exec_LimeGPS_bin(rinex_file: str, location: str, duration: int):
 | |
|   # this functions should never return, cause return means, timeout is
 | |
|   # reached or it crashed
 | |
|   try:
 | |
|     cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", location]
 | |
|     sp.check_output(cmd, timeout=duration)
 | |
|   except sp.TimeoutExpired:
 | |
|     print("LimeGPS timeout reached!")
 | |
|   except Exception as e:
 | |
|     print(f"LimeGPS crashed: {str(e)}")
 | |
| 
 | |
| 
 | |
| def run_lime_gps(rinex_file: str, location: str, duration: int):
 | |
|   print(f"LimeGPS {location} {duration}")
 | |
| 
 | |
|   p = multiprocessing.Process(target=exec_LimeGPS_bin,
 | |
|                               args=(rinex_file, location, duration))
 | |
|   p.start()
 | |
|   return p
 | |
| 
 | |
| 
 | |
| def get_random_coords(lat, lon) -> Tuple[int, int]:
 | |
|   # jump around the world
 | |
|   # max values, lat: -90 to 90, lon: -180 to 180
 | |
| 
 | |
|   lat_add = random.random()*20 + 10
 | |
|   lon_add = random.random()*20 + 20
 | |
| 
 | |
|   lat = ((lat + lat_add + 90) % 180) - 90
 | |
|   lon = ((lon + lon_add + 180) % 360) - 180
 | |
|   return round(lat, 5), round(lon, 5)
 | |
| 
 | |
| def get_continuous_coords(lat, lon) -> Tuple[int, int]:
 | |
|   # continuously move around the world
 | |
| 
 | |
|   lat_add = random.random()*0.01
 | |
|   lon_add = random.random()*0.01
 | |
| 
 | |
|   lat = ((lat + lat_add + 90) % 180) - 90
 | |
|   lon = ((lon + lon_add + 180) % 360) - 180
 | |
|   return round(lat, 5), round(lon, 5)
 | |
| 
 | |
| rc_p: Any = None
 | |
| def exec_remote_checker(lat, lon, duration):
 | |
|   global rc_p
 | |
|   # TODO: good enough for testing
 | |
|   remote_cmd =  "export PYTHONPATH=/data/pythonpath:/data/pythonpath/pyextra && "
 | |
|   remote_cmd += "cd /data/openpilot && "
 | |
|   remote_cmd += f"timeout {duration} /usr/local/pyenv/shims/python tools/gpstest/remote_checker.py "
 | |
|   remote_cmd += f"{lat} {lon}"
 | |
| 
 | |
|   ssh_cmd = ['ssh', '-i', '/home/batman/openpilot/xx/phone/key/id_rsa',
 | |
|              'comma@192.168.60.130']
 | |
|   ssh_cmd += [remote_cmd]
 | |
| 
 | |
|   rc_p = sp.Popen(ssh_cmd, stdout=sp.PIPE)
 | |
|   rc_p.wait()
 | |
|   rc_output = rc_p.stdout.read()
 | |
|   print(f"Checker Result: {rc_output.strip().decode('utf-8')}")
 | |
| 
 | |
| 
 | |
| def run_remote_checker(spoof_proc, lat, lon, duration) -> bool:
 | |
|   checker_thread = threading.Thread(target=exec_remote_checker, args=(lat, lon, duration))
 | |
|   checker_thread.start()
 | |
| 
 | |
|   tcnt = 0
 | |
|   while True:
 | |
|     if not checker_thread.is_alive():
 | |
|       # assume this only happens when the signal got matched
 | |
|       return True
 | |
| 
 | |
|     # the spoofing process has a timeout, kill checker if reached
 | |
|     if not spoof_proc.is_alive():
 | |
|       rc_p.kill()
 | |
|       # spoofing process died, assume timeout
 | |
|       print("Spoofing process timeout")
 | |
|       return False
 | |
| 
 | |
|     print(f"Time elapsed: {tcnt}[s]", end = "\r")
 | |
|     time.sleep(1)
 | |
|     tcnt += 1
 | |
| 
 | |
| 
 | |
| def main():
 | |
|   continuous_mode = False
 | |
|   if len(sys.argv) == 2 and sys.argv[1] == '-c':
 | |
|     print("Continuous Mode!")
 | |
|     continuous_mode = True
 | |
| 
 | |
|   rinex_file = download_rinex()
 | |
| 
 | |
|   duration = 60*3 # max runtime in seconds
 | |
|   lat, lon = get_random_coords(47.2020, 15.7403)
 | |
| 
 | |
|   while True:
 | |
|     # spoof random location
 | |
|     spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},100", duration)
 | |
|     start_time = time.monotonic()
 | |
| 
 | |
|     # remote checker runs blocking
 | |
|     if not run_remote_checker(spoof_proc, lat, lon, duration):
 | |
|       # location could not be matched by ublox module
 | |
|       pass
 | |
| 
 | |
|     end_time = time.monotonic()
 | |
|     spoof_proc.terminate()
 | |
| 
 | |
|     # -1 to count process startup
 | |
|     print(f"Time to get Signal: {round(end_time - start_time - 1, 4)}")
 | |
| 
 | |
|     if continuous_mode:
 | |
|       lat, lon = get_continuous_coords(lat, lon)
 | |
|     else:
 | |
|       lat, lon = get_random_coords(lat, lon)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   main()
 | |
| 
 |