openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
4.8 KiB

import os
import sys
from dataclasses import dataclass, fields
from subprocess import check_output, CalledProcessError
from time import sleep
from typing import NoReturn
DEBUG = int(os.environ.get("DEBUG", "0"))
@dataclass
class GnssClockNmeaPort:
# flags bit mask:
# 0x01 = leap_seconds valid
# 0x02 = time_uncertainty_ns valid
# 0x04 = full_bias_ns valid
# 0x08 = bias_ns valid
# 0x10 = bias_uncertainty_ns valid
# 0x20 = drift_nsps valid
# 0x40 = drift_uncertainty_nsps valid
flags: int
leap_seconds: int
time_ns: int
time_uncertainty_ns: int # 1-sigma
full_bias_ns: int
bias_ns: float
bias_uncertainty_ns: float # 1-sigma
drift_nsps: float
drift_uncertainty_nsps: float # 1-sigma
def __post_init__(self):
for field in fields(self):
val = getattr(self, field.name)
setattr(self, field.name, field.type(val) if val else None)
@dataclass
class GnssMeasNmeaPort:
messageCount: int
messageNum: int
svCount: int
# constellation enum:
# 1 = GPS
# 2 = SBAS
# 3 = GLONASS
# 4 = QZSS
# 5 = BEIDOU
# 6 = GALILEO
constellation: int
svId: int
flags: int # always zero
time_offset_ns: int
# state bit mask:
# 0x0001 = CODE LOCK
# 0x0002 = BIT SYNC
# 0x0004 = SUBFRAME SYNC
# 0x0008 = TIME OF WEEK DECODED
# 0x0010 = MSEC AMBIGUOUS
# 0x0020 = SYMBOL SYNC
# 0x0040 = GLONASS STRING SYNC
# 0x0080 = GLONASS TIME OF DAY DECODED
# 0x0100 = BEIDOU D2 BIT SYNC
# 0x0200 = BEIDOU D2 SUBFRAME SYNC
# 0x0400 = GALILEO E1BC CODE LOCK
# 0x0800 = GALILEO E1C 2ND CODE LOCK
# 0x1000 = GALILEO E1B PAGE SYNC
# 0x2000 = GALILEO E1B PAGE SYNC
state: int
time_of_week_ns: int
time_of_week_uncertainty_ns: int # 1-sigma
carrier_to_noise_ratio: float
pseudorange_rate: float
pseudorange_rate_uncertainty: float # 1-sigma
def __post_init__(self):
for field in fields(self):
val = getattr(self, field.name)
setattr(self, field.name, field.type(val) if val else None)
def nmea_checksum_ok(s):
checksum = 0
for i, c in enumerate(s[1:]):
if c == "*":
if i != len(s) - 4: # should be 3rd to last character
print("ERROR: NMEA string does not have checksum delimiter in correct location:", s)
return False
break
checksum ^= ord(c)
else:
print("ERROR: NMEA string does not have checksum delimiter:", s)
return False
return True
def process_nmea_port_messages(device:str="/dev/ttyUSB1") -> NoReturn:
while True:
try:
with open(device, "r") as nmeaport:
for line in nmeaport:
line = line.strip()
if DEBUG:
print(line)
if not line.startswith("$"): # all NMEA messages start with $
continue
if not nmea_checksum_ok(line):
continue
fields = line.split(",")
match fields[0]:
case "$GNCLK":
# fields at end are reserved (not used)
gnss_clock = GnssClockNmeaPort(*fields[1:10]) # type: ignore[arg-type]
print(gnss_clock)
case "$GNMEAS":
# fields at end are reserved (not used)
gnss_meas = GnssMeasNmeaPort(*fields[1:14]) # type: ignore[arg-type]
print(gnss_meas)
except Exception as e:
print(e)
sleep(1)
def main() -> NoReturn:
from openpilot.common.gpio import gpio_init, gpio_set
from openpilot.system.hardware.tici.pins import GPIO
from openpilot.system.sensord.rawgps.rawgpsd import at_cmd
try:
check_output(["pidof", "rawgpsd"])
print("rawgpsd is running, please kill openpilot before running this script! (aborted)")
sys.exit(1)
except CalledProcessError as e:
if e.returncode != 1: # 1 == no process found (boardd not running)
raise e
print("power up antenna ...")
gpio_init(GPIO.GNSS_PWR_EN, True)
gpio_set(GPIO.GNSS_PWR_EN, True)
if b"+QGPS: 0" not in (at_cmd("AT+QGPS?") or b""):
print("stop location tracking ...")
at_cmd("AT+QGPSEND")
if b'+QGPSCFG: "outport",usbnmea' not in (at_cmd('AT+QGPSCFG="outport"') or b""):
print("configure outport ...")
at_cmd('AT+QGPSCFG="outport","usbnmea"') # usbnmea = /dev/ttyUSB1
if b'+QGPSCFG: "gnssrawdata",3,0' not in (at_cmd('AT+QGPSCFG="gnssrawdata"') or b""):
print("configure gnssrawdata ...")
# AT+QGPSCFG="gnssrawdata",<constellation-mask>,<port>'
# <constellation-mask> values:
# 0x01 = GPS
# 0x02 = GLONASS
# 0x04 = BEIDOU
# 0x08 = GALILEO
# 0x10 = QZSS
# <port> values:
# 0 = NMEA port
# 1 = AT port
at_cmd('AT+QGPSCFG="gnssrawdata",3,0') # enable all constellations, output data to NMEA port
print("rebooting ...")
at_cmd('AT+CFUN=1,1')
print("re-run this script when it is back up")
sys.exit(2)
print("starting location tracking ...")
at_cmd("AT+QGPS=1")
process_nmea_port_messages()
if __name__ == "__main__":
main()