#!/usr/bin/env python3 import sys import time import signal import serial import struct import requests import urllib.parse from datetime import datetime, UTC from cereal import messaging from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.system.hardware import TICI from openpilot.common.gpio import gpio_init, gpio_set from openpilot.system.hardware.tici.pins import GPIO UBLOX_TTY = "/dev/ttyHS0" UBLOX_ACK = b"\xb5\x62\x05\x01\x02\x00" UBLOX_NACK = b"\xb5\x62\x05\x00\x02\x00" UBLOX_SOS_ACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x01\x00\x00\x00" UBLOX_SOS_NACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x00\x00\x00\x00" UBLOX_BACKUP_RESTORE_MSG = b"\xb5\x62\x09\x14\x08\x00\x03" UBLOX_ASSIST_ACK = b"\xb5\x62\x13\x60\x08\x00" def set_power(enabled: bool) -> None: gpio_init(GPIO.UBLOX_SAFEBOOT_N, True) gpio_init(GPIO.GNSS_PWR_EN, True) gpio_init(GPIO.UBLOX_RST_N, True) gpio_set(GPIO.UBLOX_SAFEBOOT_N, True) gpio_set(GPIO.GNSS_PWR_EN, enabled) gpio_set(GPIO.UBLOX_RST_N, enabled) def add_ubx_checksum(msg: bytes) -> bytes: A = B = 0 for b in msg[2:]: A = (A + b) % 256 B = (B + A) % 256 return msg + bytes([A, B]) def get_assistnow_messages(token: bytes) -> list[bytes]: # make request # TODO: implement adding the last known location r = requests.get("https://online-live2.services.u-blox.com/GetOnlineData.ashx", params=urllib.parse.urlencode({ 'token': token, 'gnss': 'gps,glo', 'datatype': 'eph,alm,aux', }, safe=':,'), timeout=5) assert r.status_code == 200, "Got invalid status code" dat = r.content # split up messages msgs = [] while len(dat) > 0: assert dat[:2] == b"\xB5\x62" msg_len = 6 + (dat[5] << 8 | dat[4]) + 2 msgs.append(dat[:msg_len]) dat = dat[msg_len:] return msgs class TTYPigeon: def __init__(self): self.tty = serial.VTIMESerial(UBLOX_TTY, baudrate=9600, timeout=0) def send(self, dat: bytes) -> None: self.tty.write(dat) def receive(self) -> bytes: dat = b'' while len(dat) < 0x1000: d = self.tty.read(0x40) dat += d if len(d) == 0: break return dat def set_baud(self, baud: int) -> None: self.tty.baudrate = baud def wait_for_ack(self, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK, timeout: float = 0.5) -> bool: dat = b'' st = time.monotonic() while True: dat += self.receive() if ack in dat: cloudlog.debug("Received ACK from ublox") return True elif nack in dat: cloudlog.error("Received NACK from ublox") return False elif time.monotonic() - st > timeout: cloudlog.error("No response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def send_with_ack(self, dat: bytes, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK) -> None: self.send(dat) self.wait_for_ack(ack, nack) def wait_for_backup_restore_status(self, timeout: float = 1.) -> int: dat = b'' st = time.monotonic() while True: dat += self.receive() position = dat.find(UBLOX_BACKUP_RESTORE_MSG) if position >= 0 and len(dat) >= position + 11: return dat[position + 10] elif time.monotonic() - st > timeout: cloudlog.error("No backup restore response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def reset_device(self) -> bool: # deleting the backup does not always work on first try (mostly on second try) for _ in range(5): # device cold start self.send(b"\xb5\x62\x06\x04\x04\x00\xff\xff\x00\x00\x0c\x5d") time.sleep(1) # wait for cold start init_baudrate(self) # clear configuration self.send_with_ack(b"\xb5\x62\x06\x09\x0d\x00\x1f\x1f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17\x71\xd7") # clear flash memory (almanac backup) self.send_with_ack(b"\xB5\x62\x09\x14\x04\x00\x01\x00\x00\x00\x22\xf0") # try restoring backup to verify it got deleted self.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") # 1: failed to restore, 2: could restore, 3: no backup status = self.wait_for_backup_restore_status() if status == 1 or status == 3: return True return False def init_baudrate(pigeon: TTYPigeon): # ublox default setting on startup is 9600 baudrate pigeon.set_baud(9600) # $PUBX,41,1,0007,0003,460800,0*15\r\n pigeon.send(b"\x24\x50\x55\x42\x58\x2C\x34\x31\x2C\x31\x2C\x30\x30\x30\x37\x2C\x30\x30\x30\x33\x2C\x34\x36\x30\x38\x30\x30\x2C\x30\x2A\x31\x35\x0D\x0A") time.sleep(0.1) pigeon.set_baud(460800) def initialize_pigeon(pigeon: TTYPigeon) -> bool: # try initializing a few times for _ in range(10): try: # setup port config pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x00\x00\x06\x18") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x01\x08\x22") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x03\x0A\x24") # UBX-CFG-RATE (0x06 0x08) pigeon.send_with_ack(b"\xB5\x62\x06\x08\x06\x00\x64\x00\x01\x00\x00\x00\x79\x10") # UBX-CFG-NAV5 (0x06 0x24) pigeon.send_with_ack(b"\xB5\x62\x06\x24\x24\x00\x05\x00\x04\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5A\x63") # UBX-CFG-ODO (0x06 0x1E) pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x14\x00\x00\x00\x00\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3C\x37") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x08\x00\xFF\xAD\x62\xAD\x1E\x63\x00\x00\x83\x0C") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x28\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x56\x24") # UBX-CFG-NAV5 (0x06 0x24) pigeon.send_with_ack(b"\xB5\x62\x06\x24\x00\x00\x2A\x84") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x00\x00\x29\x81") pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x00\x00\x24\x72") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x00\x00\x3F\xC3") # UBX-CFG-MSG (set message rate) pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x07\x01\x13\x51") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x15\x01\x22\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x13\x01\x20\x6C") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x09\x01\x1E\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x0B\x01\x20\x74") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x35\x01\x41\xAD") cloudlog.debug("pigeon configured") # try restoring almanac backup pigeon.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") restore_status = pigeon.wait_for_backup_restore_status() if restore_status == 2: cloudlog.warning("almanac backup restored") elif restore_status == 3: cloudlog.warning("no almanac backup found") else: cloudlog.error(f"failed to restore almanac backup, status: {restore_status}") # sending time to ublox t_now = datetime.now(UTC).replace(tzinfo=None) if t_now >= datetime(2021, 6, 1): cloudlog.warning("Sending current time to ublox") # UBX-MGA-INI-TIME_UTC msg = add_ubx_checksum(b"\xB5\x62\x13\x40\x18\x00" + struct.pack("<BBBBHBBBBBxIHxxI", 0x10, 0x00, 0x00, 0x80, t_now.year, t_now.month, t_now.day, t_now.hour, t_now.minute, t_now.second, 0, 30, 0 )) pigeon.send_with_ack(msg, ack=UBLOX_ASSIST_ACK) # try getting AssistNow if we have a token token = Params().get('AssistNowToken') if token is not None: try: for msg in get_assistnow_messages(token): pigeon.send_with_ack(msg, ack=UBLOX_ASSIST_ACK) cloudlog.warning("AssistNow messages sent") except Exception: cloudlog.warning("failed to get AssistNow messages") cloudlog.warning("Pigeon GPS on!") break except TimeoutError: cloudlog.warning("Initialization failed, trying again!") else: cloudlog.warning("Failed to initialize pigeon") return False return True def deinitialize_and_exit(pigeon: TTYPigeon | None): cloudlog.warning("Storing almanac in ublox flash") if pigeon is not None: # controlled GNSS stop pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74") # store almanac in flash pigeon.send(b"\xB5\x62\x09\x14\x04\x00\x00\x00\x00\x00\x21\xEC") try: if pigeon.wait_for_ack(ack=UBLOX_SOS_ACK, nack=UBLOX_SOS_NACK): cloudlog.warning("Done storing almanac") else: cloudlog.error("Error storing almanac") except TimeoutError: pass # turn off power and exit cleanly set_power(False) sys.exit(0) def create_pigeon() -> tuple[TTYPigeon, messaging.PubMaster]: pigeon = None # register exit handler signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon)) pm = messaging.PubMaster(['ubloxRaw']) # power cycle ublox set_power(False) time.sleep(0.1) set_power(True) time.sleep(0.5) pigeon = TTYPigeon() return pigeon, pm def run_receiving(pigeon: TTYPigeon, pm: messaging.PubMaster, duration: int = 0): start_time = time.monotonic() def end_condition(): return True if duration == 0 else time.monotonic() - start_time < duration while end_condition(): dat = pigeon.receive() if len(dat) > 0: if dat[0] == 0x00: cloudlog.warning("received invalid data from ublox, re-initing!") init_baudrate(pigeon) initialize_pigeon(pigeon) continue # send out to socket msg = messaging.new_message('ubloxRaw', len(dat), valid=True) msg.ubloxRaw = dat[:] pm.send('ubloxRaw', msg) else: # prevent locking up a CPU core if ublox disconnects time.sleep(0.001) def main(): assert TICI, "unsupported hardware for pigeond" pigeon, pm = create_pigeon() init_baudrate(pigeon) initialize_pigeon(pigeon) # start receiving data run_receiving(pigeon, pm) if __name__ == "__main__": main()