|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import sys
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | from typing import List
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from common.params import Params
 | 
					
						
							|  |  |  | import cereal.messaging as messaging
 | 
					
						
							|  |  |  | from selfdrive.manager.process_config import managed_processes
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | DELTA = 0.001
 | 
					
						
							|  |  |  | # assume running openpilot for now
 | 
					
						
							|  |  |  | procs: List[str] = []#"ubloxd", "pigeond"]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main():
 | 
					
						
							|  |  |  |   if len(sys.argv) != 4:
 | 
					
						
							|  |  |  |     print("args: <latitude> <longitude>")
 | 
					
						
							|  |  |  |     return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   quectel_mod = Params().get_bool("UbloxAvailable")
 | 
					
						
							|  |  |  |   sol_lat = float(sys.argv[2])
 | 
					
						
							|  |  |  |   sol_lon = float(sys.argv[3])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   for p in procs:
 | 
					
						
							|  |  |  |     managed_processes[p].start()
 | 
					
						
							|  |  |  |     time.sleep(0.5) # give time to startup
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   socket = 'gpsLocation' if quectel_mod else 'gpsLocationExternal'
 | 
					
						
							|  |  |  |   gps_sock = messaging.sub_sock(socket, timeout=0.1)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # analyze until the location changed
 | 
					
						
							|  |  |  |   while True:
 | 
					
						
							|  |  |  |     events = messaging.drain_sock(gps_sock)
 | 
					
						
							|  |  |  |     for e in events:
 | 
					
						
							|  |  |  |       loc = e.gpsLocation if quectel_mod else e.gpsLocationExternal
 | 
					
						
							|  |  |  |       lat = loc.latitude
 | 
					
						
							|  |  |  |       lon = loc.longitude
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if abs(lat - sol_lat) < DELTA and abs(lon - sol_lon) < DELTA:
 | 
					
						
							|  |  |  |         print("MATCH")
 | 
					
						
							|  |  |  |         return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     time.sleep(0.1)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for p in procs:
 | 
					
						
							|  |  |  |       if not managed_processes[p].proc.is_alive():
 | 
					
						
							|  |  |  |         print(f"ERROR: '{p}' died")
 | 
					
						
							|  |  |  |         return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   main()
 | 
					
						
							|  |  |  |   for p in procs:
 | 
					
						
							|  |  |  |     managed_processes[p].stop()
 |