You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					166 lines
				
				4.0 KiB
			
		
		
			
		
	
	
					166 lines
				
				4.0 KiB
			| 
											2 years ago
										 | #!/usr/bin/env python3
 | ||
|  | import os
 | ||
|  | import time
 | ||
|  | import traceback
 | ||
|  | import serial
 | ||
|  | import datetime
 | ||
|  | import numpy as np
 | ||
|  | from collections import defaultdict
 | ||
|  | 
 | ||
|  | from cereal import log
 | ||
|  | import cereal.messaging as messaging
 | ||
|  | from openpilot.common.retry import retry
 | ||
|  | from openpilot.common.swaglog import cloudlog
 | ||
|  | from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def sfloat(n: str):
 | ||
|  |   return float(n) if len(n) > 0 else 0
 | ||
|  | 
 | ||
|  | def checksum(s: str):
 | ||
|  |   ret = 0
 | ||
|  |   for c in s[1:-3]:
 | ||
|  |     ret ^= ord(c)
 | ||
|  |   return format(ret, '02X')
 | ||
|  | 
 | ||
|  | class Unicore:
 | ||
|  |   def __init__(self):
 | ||
|  |     self.s = serial.Serial('/dev/ttyHS0', 115200)
 | ||
|  |     self.s.timeout = 1
 | ||
|  |     self.s.writeTimeout = 1
 | ||
|  |     self.s.newline = b'\r\n'
 | ||
|  | 
 | ||
|  |     self.s.flush()
 | ||
|  |     self.s.reset_input_buffer()
 | ||
|  |     self.s.reset_output_buffer()
 | ||
|  |     self.s.read(2048)
 | ||
|  | 
 | ||
|  |   def send(self, cmd):
 | ||
|  |     self.s.write(cmd.encode('utf8') + b'\r')
 | ||
|  |     resp = self.s.read(2048)
 | ||
|  |     print(len(resp), cmd, "\n", resp)
 | ||
|  |     assert b"OK" in resp
 | ||
|  | 
 | ||
|  |   def recv(self):
 | ||
|  |     return self.s.readline()
 | ||
|  | 
 | ||
|  | def build_msg(state):
 | ||
|  |   """
 | ||
|  |     NMEA sentences:
 | ||
|  |       https://campar.in.tum.de/twiki/pub/Chair/NaviGpsDemon/nmea.html#RMC
 | ||
|  |     NAV messages:
 | ||
|  |       https://www.unicorecomm.com/assets/upload/file/UFirebird_Standard_Positioning_Products_Protocol_Specification_CH.pdf
 | ||
|  |   """
 | ||
|  | 
 | ||
|  |   msg = messaging.new_message('gpsLocation', valid=True)
 | ||
|  |   gps = msg.gpsLocation
 | ||
|  | 
 | ||
|  |   gnrmc = state['$GNRMC']
 | ||
|  |   gps.hasFix = gnrmc[1] == 'A'
 | ||
|  |   gps.source = log.GpsLocationData.SensorSource.unicore
 | ||
|  |   gps.latitude = (sfloat(gnrmc[3][:2]) + (sfloat(gnrmc[3][2:]) / 60)) * (1 if gnrmc[4] == "N" else -1)
 | ||
|  |   gps.longitude = (sfloat(gnrmc[5][:3]) + (sfloat(gnrmc[5][3:]) / 60)) * (1 if gnrmc[6] == "E" else -1)
 | ||
|  | 
 | ||
|  |   try:
 | ||
|  |     date = gnrmc[9][:6]
 | ||
|  |     dt = datetime.datetime.strptime(f"{date} {gnrmc[1]}", '%d%m%y %H%M%S.%f')
 | ||
|  |     gps.unixTimestampMillis = dt.timestamp()*1e3
 | ||
|  |   except Exception:
 | ||
|  |     pass
 | ||
|  | 
 | ||
|  |   gps.bearingDeg = sfloat(gnrmc[8])
 | ||
|  | 
 | ||
|  |   if len(state['$GNGGA']):
 | ||
|  |     gngga = state['$GNGGA']
 | ||
|  |     if gngga[10] == 'M':
 | ||
|  |       gps.altitude = sfloat(gngga[9])
 | ||
|  | 
 | ||
|  |   if len(state['$GNGSA']):
 | ||
|  |     gngsa = state['$GNGSA']
 | ||
|  |     gps.horizontalAccuracy = sfloat(gngsa[4])
 | ||
|  |     gps.verticalAccuracy = sfloat(gngsa[5])
 | ||
|  | 
 | ||
|  |   #if len(state['$NAVACC']):
 | ||
|  |   #  # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A
 | ||
|  |   #  navacc = state['$NAVACC']
 | ||
|  |   #  gps.horizontalAccuracy = sfloat(navacc[3])
 | ||
|  |   #  gps.speedAccuracy = sfloat(navacc[4])
 | ||
|  |   #  gps.bearingAccuracyDeg = sfloat(navacc[5])
 | ||
|  | 
 | ||
|  |   if len(state['$NAVVEL']):
 | ||
|  |     # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A
 | ||
|  |     navvel = state['$NAVVEL']
 | ||
|  |     vECEF = [
 | ||
|  |       sfloat(navvel[4]),
 | ||
|  |       sfloat(navvel[5]),
 | ||
|  |       sfloat(navvel[6]),
 | ||
|  |     ]
 | ||
|  | 
 | ||
|  |     lat = np.radians(gps.latitude)
 | ||
|  |     lon = np.radians(gps.longitude)
 | ||
|  |     R = np.array([
 | ||
|  |       [-np.sin(lat) * np.cos(lon), -np.sin(lon), -np.cos(lat) * np.cos(lon)],
 | ||
|  |       [-np.sin(lat) * np.sin(lon), np.cos(lon), -np.cos(lat) * np.sin(lon)],
 | ||
|  |       [np.cos(lat), 0, -np.sin(lat)]
 | ||
|  |     ])
 | ||
|  | 
 | ||
|  |     vNED = [float(x) for x in R.dot(vECEF)]
 | ||
|  |     gps.vNED = vNED
 | ||
|  |     gps.speed = np.linalg.norm(vNED)
 | ||
|  | 
 | ||
|  |   # TODO: set these from the module
 | ||
|  |   gps.bearingAccuracyDeg = 5.
 | ||
|  |   gps.speedAccuracy = 3.
 | ||
|  | 
 | ||
|  |   return msg
 | ||
|  | 
 | ||
|  | 
 | ||
|  | @retry(attempts=10, delay=0.1)
 | ||
|  | def setup(u):
 | ||
|  |   at_cmd('AT+CGPS=0')
 | ||
|  |   at_cmd('AT+CGPS=1')
 | ||
|  |   time.sleep(1.0)
 | ||
|  | 
 | ||
|  |   # setup NAVXXX outputs
 | ||
|  |   for i in range(4):
 | ||
|  |     u.send(f"$CFGMSG,1,{i},1")
 | ||
|  |   for i in (1, 3):
 | ||
|  |     u.send(f"$CFGMSG,3,{i},1")
 | ||
|  | 
 | ||
|  |   # 10Hz NAV outputs
 | ||
|  |   u.send("$CFGNAV,100,100,1000")
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def main():
 | ||
|  |   wait_for_modem("AT+CGPS?")
 | ||
|  | 
 | ||
|  |   u = Unicore()
 | ||
|  |   setup(u)
 | ||
|  | 
 | ||
|  |   state = defaultdict(list)
 | ||
|  |   pm = messaging.PubMaster(['gpsLocation'])
 | ||
|  |   while True:
 | ||
|  |     try:
 | ||
|  |       msg = u.recv().decode('utf8').strip()
 | ||
|  |       if "DEBUG" in os.environ:
 | ||
|  |         print(repr(msg))
 | ||
|  | 
 | ||
|  |       if len(msg) > 0:
 | ||
|  |         if checksum(msg) != msg.split('*')[1]:
 | ||
|  |           cloudlog.error(f"invalid checksum: {repr(msg)}")
 | ||
|  |           continue
 | ||
|  | 
 | ||
|  |         k = msg.split(',')[0]
 | ||
|  |         state[k] = msg.split(',')
 | ||
|  |         if '$GNRMC' not in msg:
 | ||
|  |           continue
 | ||
|  | 
 | ||
|  |         pm.send('gpsLocation', build_msg(state))
 | ||
|  |     except Exception:
 | ||
|  |       traceback.print_exc()
 | ||
|  |       cloudlog.exception("gps.issue")
 | ||
|  | 
 | ||
|  | 
 | ||
|  | if __name__ == "__main__":
 | ||
|  |   main()
 |