You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							54 lines
						
					
					
						
							1.3 KiB
						
					
					
				
			
		
		
	
	
							54 lines
						
					
					
						
							1.3 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import sys
 | 
						|
import time
 | 
						|
from typing import List
 | 
						|
 | 
						|
from common.params import Params
 | 
						|
import cereal.messaging as messaging
 | 
						|
from selfdrive.manager.process_config import managed_processes
 | 
						|
 | 
						|
DELTA = 0.001
 | 
						|
# assume running openpilot for now
 | 
						|
procs: List[str] = []#"ubloxd", "pigeond"]
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
  if len(sys.argv) != 4:
 | 
						|
    print("args: <latitude> <longitude>")
 | 
						|
    return
 | 
						|
 | 
						|
  quectel_mod = Params().get_bool("UbloxAvailable")
 | 
						|
  sol_lat = float(sys.argv[2])
 | 
						|
  sol_lon = float(sys.argv[3])
 | 
						|
 | 
						|
  for p in procs:
 | 
						|
    managed_processes[p].start()
 | 
						|
    time.sleep(0.5) # give time to startup
 | 
						|
 | 
						|
  socket = 'gpsLocation' if quectel_mod else 'gpsLocationExternal'
 | 
						|
  gps_sock = messaging.sub_sock(socket, timeout=0.1)
 | 
						|
 | 
						|
  # analyze until the location changed
 | 
						|
  while True:
 | 
						|
    events = messaging.drain_sock(gps_sock)
 | 
						|
    for e in events:
 | 
						|
      loc = e.gpsLocation if quectel_mod else e.gpsLocationExternal
 | 
						|
      lat = loc.latitude
 | 
						|
      lon = loc.longitude
 | 
						|
 | 
						|
      if abs(lat - sol_lat) < DELTA and abs(lon - sol_lon) < DELTA:
 | 
						|
        print("MATCH")
 | 
						|
        return
 | 
						|
 | 
						|
    time.sleep(0.1)
 | 
						|
 | 
						|
    for p in procs:
 | 
						|
      if not managed_processes[p].proc.is_alive():
 | 
						|
        print(f"ERROR: '{p}' died")
 | 
						|
        return
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  main()
 | 
						|
  for p in procs:
 | 
						|
    managed_processes[p].stop()
 | 
						|
 |