You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							165 lines
						
					
					
						
							4.0 KiB
						
					
					
				
			
		
		
	
	
							165 lines
						
					
					
						
							4.0 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import os
 | |
| import time
 | |
| import traceback
 | |
| import serial
 | |
| import datetime
 | |
| import numpy as np
 | |
| from collections import defaultdict
 | |
| 
 | |
| from cereal import log
 | |
| import cereal.messaging as messaging
 | |
| from openpilot.common.retry import retry
 | |
| from openpilot.common.swaglog import cloudlog
 | |
| from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem
 | |
| 
 | |
| 
 | |
| def sfloat(n: str):
 | |
|   return float(n) if len(n) > 0 else 0
 | |
| 
 | |
| def checksum(s: str):
 | |
|   ret = 0
 | |
|   for c in s[1:-3]:
 | |
|     ret ^= ord(c)
 | |
|   return format(ret, '02X')
 | |
| 
 | |
| class Unicore:
 | |
|   def __init__(self):
 | |
|     self.s = serial.Serial('/dev/ttyHS0', 115200)
 | |
|     self.s.timeout = 1
 | |
|     self.s.writeTimeout = 1
 | |
|     self.s.newline = b'\r\n'
 | |
| 
 | |
|     self.s.flush()
 | |
|     self.s.reset_input_buffer()
 | |
|     self.s.reset_output_buffer()
 | |
|     self.s.read(2048)
 | |
| 
 | |
|   def send(self, cmd):
 | |
|     self.s.write(cmd.encode('utf8') + b'\r')
 | |
|     resp = self.s.read(2048)
 | |
|     print(len(resp), cmd, "\n", resp)
 | |
|     assert b"OK" in resp
 | |
| 
 | |
|   def recv(self):
 | |
|     return self.s.readline()
 | |
| 
 | |
| def build_msg(state):
 | |
|   """
 | |
|     NMEA sentences:
 | |
|       https://campar.in.tum.de/twiki/pub/Chair/NaviGpsDemon/nmea.html#RMC
 | |
|     NAV messages:
 | |
|       https://www.unicorecomm.com/assets/upload/file/UFirebird_Standard_Positioning_Products_Protocol_Specification_CH.pdf
 | |
|   """
 | |
| 
 | |
|   msg = messaging.new_message('gpsLocation', valid=True)
 | |
|   gps = msg.gpsLocation
 | |
| 
 | |
|   gnrmc = state['$GNRMC']
 | |
|   gps.hasFix = gnrmc[1] == 'A'
 | |
|   gps.source = log.GpsLocationData.SensorSource.unicore
 | |
|   gps.latitude = (sfloat(gnrmc[3][:2]) + (sfloat(gnrmc[3][2:]) / 60)) * (1 if gnrmc[4] == "N" else -1)
 | |
|   gps.longitude = (sfloat(gnrmc[5][:3]) + (sfloat(gnrmc[5][3:]) / 60)) * (1 if gnrmc[6] == "E" else -1)
 | |
| 
 | |
|   try:
 | |
|     date = gnrmc[9][:6]
 | |
|     dt = datetime.datetime.strptime(f"{date} {gnrmc[1]}", '%d%m%y %H%M%S.%f')
 | |
|     gps.unixTimestampMillis = dt.timestamp()*1e3
 | |
|   except Exception:
 | |
|     pass
 | |
| 
 | |
|   gps.bearingDeg = sfloat(gnrmc[8])
 | |
| 
 | |
|   if len(state['$GNGGA']):
 | |
|     gngga = state['$GNGGA']
 | |
|     if gngga[10] == 'M':
 | |
|       gps.altitude = sfloat(gngga[9])
 | |
| 
 | |
|   if len(state['$GNGSA']):
 | |
|     gngsa = state['$GNGSA']
 | |
|     gps.horizontalAccuracy = sfloat(gngsa[4])
 | |
|     gps.verticalAccuracy = sfloat(gngsa[5])
 | |
| 
 | |
|   #if len(state['$NAVACC']):
 | |
|   #  # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A
 | |
|   #  navacc = state['$NAVACC']
 | |
|   #  gps.horizontalAccuracy = sfloat(navacc[3])
 | |
|   #  gps.speedAccuracy = sfloat(navacc[4])
 | |
|   #  gps.bearingAccuracyDeg = sfloat(navacc[5])
 | |
| 
 | |
|   if len(state['$NAVVEL']):
 | |
|     # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A
 | |
|     navvel = state['$NAVVEL']
 | |
|     vECEF = [
 | |
|       sfloat(navvel[4]),
 | |
|       sfloat(navvel[5]),
 | |
|       sfloat(navvel[6]),
 | |
|     ]
 | |
| 
 | |
|     lat = np.radians(gps.latitude)
 | |
|     lon = np.radians(gps.longitude)
 | |
|     R = np.array([
 | |
|       [-np.sin(lat) * np.cos(lon), -np.sin(lon), -np.cos(lat) * np.cos(lon)],
 | |
|       [-np.sin(lat) * np.sin(lon), np.cos(lon), -np.cos(lat) * np.sin(lon)],
 | |
|       [np.cos(lat), 0, -np.sin(lat)]
 | |
|     ])
 | |
| 
 | |
|     vNED = [float(x) for x in R.dot(vECEF)]
 | |
|     gps.vNED = vNED
 | |
|     gps.speed = np.linalg.norm(vNED)
 | |
| 
 | |
|   # TODO: set these from the module
 | |
|   gps.bearingAccuracyDeg = 5.
 | |
|   gps.speedAccuracy = 3.
 | |
| 
 | |
|   return msg
 | |
| 
 | |
| 
 | |
| @retry(attempts=10, delay=0.1)
 | |
| def setup(u):
 | |
|   at_cmd('AT+CGPS=0')
 | |
|   at_cmd('AT+CGPS=1')
 | |
|   time.sleep(1.0)
 | |
| 
 | |
|   # setup NAVXXX outputs
 | |
|   for i in range(4):
 | |
|     u.send(f"$CFGMSG,1,{i},1")
 | |
|   for i in (1, 3):
 | |
|     u.send(f"$CFGMSG,3,{i},1")
 | |
| 
 | |
|   # 10Hz NAV outputs
 | |
|   u.send("$CFGNAV,100,100,1000")
 | |
| 
 | |
| 
 | |
| def main():
 | |
|   wait_for_modem("AT+CGPS?")
 | |
| 
 | |
|   u = Unicore()
 | |
|   setup(u)
 | |
| 
 | |
|   state = defaultdict(list)
 | |
|   pm = messaging.PubMaster(['gpsLocation'])
 | |
|   while True:
 | |
|     try:
 | |
|       msg = u.recv().decode('utf8').strip()
 | |
|       if "DEBUG" in os.environ:
 | |
|         print(repr(msg))
 | |
| 
 | |
|       if len(msg) > 0:
 | |
|         if checksum(msg) != msg.split('*')[1]:
 | |
|           cloudlog.error(f"invalid checksum: {repr(msg)}")
 | |
|           continue
 | |
| 
 | |
|         k = msg.split(',')[0]
 | |
|         state[k] = msg.split(',')
 | |
|         if '$GNRMC' not in msg:
 | |
|           continue
 | |
| 
 | |
|         pm.send('gpsLocation', build_msg(state))
 | |
|     except Exception:
 | |
|       traceback.print_exc()
 | |
|       cloudlog.exception("gps.issue")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   main()
 | |
| 
 |