parent
5542bd57a4
commit
8ebfc99b93
1 changed files with 0 additions and 165 deletions
@ -1,165 +0,0 @@ |
||||
#!/usr/bin/env python3 |
||||
import os |
||||
import time |
||||
import traceback |
||||
import serial |
||||
import datetime |
||||
import numpy as np |
||||
from collections import defaultdict |
||||
|
||||
from cereal import log |
||||
import cereal.messaging as messaging |
||||
from openpilot.common.retry import retry |
||||
from openpilot.common.swaglog import cloudlog |
||||
from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem |
||||
|
||||
|
||||
def sfloat(n: str): |
||||
return float(n) if len(n) > 0 else 0 |
||||
|
||||
def checksum(s: str): |
||||
ret = 0 |
||||
for c in s[1:-3]: |
||||
ret ^= ord(c) |
||||
return format(ret, '02X') |
||||
|
||||
class Unicore: |
||||
def __init__(self): |
||||
self.s = serial.Serial('/dev/ttyHS0', 115200) |
||||
self.s.timeout = 1 |
||||
self.s.writeTimeout = 1 |
||||
self.s.newline = b'\r\n' |
||||
|
||||
self.s.flush() |
||||
self.s.reset_input_buffer() |
||||
self.s.reset_output_buffer() |
||||
self.s.read(2048) |
||||
|
||||
def send(self, cmd): |
||||
self.s.write(cmd.encode('utf8') + b'\r') |
||||
resp = self.s.read(2048) |
||||
print(len(resp), cmd, "\n", resp) |
||||
assert b"OK" in resp |
||||
|
||||
def recv(self): |
||||
return self.s.readline() |
||||
|
||||
def build_msg(state): |
||||
""" |
||||
NMEA sentences: |
||||
https://campar.in.tum.de/twiki/pub/Chair/NaviGpsDemon/nmea.html#RMC |
||||
NAV messages: |
||||
https://www.unicorecomm.com/assets/upload/file/UFirebird_Standard_Positioning_Products_Protocol_Specification_CH.pdf |
||||
""" |
||||
|
||||
msg = messaging.new_message('gpsLocation', valid=True) |
||||
gps = msg.gpsLocation |
||||
|
||||
gnrmc = state['$GNRMC'] |
||||
gps.hasFix = gnrmc[1] == 'A' |
||||
gps.source = log.GpsLocationData.SensorSource.unicore |
||||
gps.latitude = (sfloat(gnrmc[3][:2]) + (sfloat(gnrmc[3][2:]) / 60)) * (1 if gnrmc[4] == "N" else -1) |
||||
gps.longitude = (sfloat(gnrmc[5][:3]) + (sfloat(gnrmc[5][3:]) / 60)) * (1 if gnrmc[6] == "E" else -1) |
||||
|
||||
try: |
||||
date = gnrmc[9][:6] |
||||
dt = datetime.datetime.strptime(f"{date} {gnrmc[1]}", '%d%m%y %H%M%S.%f') |
||||
gps.unixTimestampMillis = dt.timestamp()*1e3 |
||||
except Exception: |
||||
pass |
||||
|
||||
gps.bearingDeg = sfloat(gnrmc[8]) |
||||
|
||||
if len(state['$GNGGA']): |
||||
gngga = state['$GNGGA'] |
||||
if gngga[10] == 'M': |
||||
gps.altitude = sfloat(gngga[9]) |
||||
|
||||
if len(state['$GNGSA']): |
||||
gngsa = state['$GNGSA'] |
||||
gps.horizontalAccuracy = sfloat(gngsa[4]) |
||||
gps.verticalAccuracy = sfloat(gngsa[5]) |
||||
|
||||
#if len(state['$NAVACC']): |
||||
# # $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A |
||||
# navacc = state['$NAVACC'] |
||||
# gps.horizontalAccuracy = sfloat(navacc[3]) |
||||
# gps.speedAccuracy = sfloat(navacc[4]) |
||||
# gps.bearingAccuracyDeg = sfloat(navacc[5]) |
||||
|
||||
if len(state['$NAVVEL']): |
||||
# $NAVVEL,264415000,5,3,0.375,0.141,-0.735,-65.450*2A |
||||
navvel = state['$NAVVEL'] |
||||
vECEF = [ |
||||
sfloat(navvel[4]), |
||||
sfloat(navvel[5]), |
||||
sfloat(navvel[6]), |
||||
] |
||||
|
||||
lat = np.radians(gps.latitude) |
||||
lon = np.radians(gps.longitude) |
||||
R = np.array([ |
||||
[-np.sin(lat) * np.cos(lon), -np.sin(lon), -np.cos(lat) * np.cos(lon)], |
||||
[-np.sin(lat) * np.sin(lon), np.cos(lon), -np.cos(lat) * np.sin(lon)], |
||||
[np.cos(lat), 0, -np.sin(lat)] |
||||
]) |
||||
|
||||
vNED = [float(x) for x in R.dot(vECEF)] |
||||
gps.vNED = vNED |
||||
gps.speed = np.linalg.norm(vNED) |
||||
|
||||
# TODO: set these from the module |
||||
gps.bearingAccuracyDeg = 5. |
||||
gps.speedAccuracy = 3. |
||||
|
||||
return msg |
||||
|
||||
|
||||
@retry(attempts=10, delay=0.1) |
||||
def setup(u): |
||||
at_cmd('AT+CGPS=0') |
||||
at_cmd('AT+CGPS=1') |
||||
time.sleep(1.0) |
||||
|
||||
# setup NAVXXX outputs |
||||
for i in range(4): |
||||
u.send(f"$CFGMSG,1,{i},1") |
||||
for i in (1, 3): |
||||
u.send(f"$CFGMSG,3,{i},1") |
||||
|
||||
# 10Hz NAV outputs |
||||
u.send("$CFGNAV,100,100,1000") |
||||
|
||||
|
||||
def main(): |
||||
wait_for_modem("AT+CGPS?") |
||||
|
||||
u = Unicore() |
||||
setup(u) |
||||
|
||||
state = defaultdict(list) |
||||
pm = messaging.PubMaster(['gpsLocation']) |
||||
while True: |
||||
try: |
||||
msg = u.recv().decode('utf8').strip() |
||||
if "DEBUG" in os.environ: |
||||
print(repr(msg)) |
||||
|
||||
if len(msg) > 0: |
||||
if checksum(msg) != msg.split('*')[1]: |
||||
cloudlog.error(f"invalid checksum: {repr(msg)}") |
||||
continue |
||||
|
||||
k = msg.split(',')[0] |
||||
state[k] = msg.split(',') |
||||
if '$GNRMC' not in msg: |
||||
continue |
||||
|
||||
pm.send('gpsLocation', build_msg(state)) |
||||
except Exception: |
||||
traceback.print_exc() |
||||
cloudlog.exception("gps.issue") |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
Loading…
Reference in new issue