#!/usr/bin/env python3 import sys import time import signal import serial import struct import requests import urllib.parse from datetime import datetime, UTC from cereal import messaging from openpilot.common.time_helpers import system_time_valid from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.system.hardware import TICI from openpilot.common.gpio import gpio_init, gpio_set from openpilot.system.hardware.tici.pins import GPIO UBLOX_TTY = "/dev/ttyHS0" UBLOX_ACK = b"\xb5\x62\x05\x01\x02\x00" UBLOX_NACK = b"\xb5\x62\x05\x00\x02\x00" UBLOX_SOS_ACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x01\x00\x00\x00" UBLOX_SOS_NACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x00\x00\x00\x00" UBLOX_BACKUP_RESTORE_MSG = b"\xb5\x62\x09\x14\x08\x00\x03" UBLOX_ASSIST_ACK = b"\xb5\x62\x13\x60\x08\x00" def set_power(enabled: bool) -> None: gpio_init(GPIO.UBLOX_SAFEBOOT_N, True) gpio_init(GPIO.GNSS_PWR_EN, True) gpio_init(GPIO.UBLOX_RST_N, True) gpio_set(GPIO.UBLOX_SAFEBOOT_N, True) gpio_set(GPIO.GNSS_PWR_EN, enabled) gpio_set(GPIO.UBLOX_RST_N, enabled) def add_ubx_checksum(msg: bytes) -> bytes: A = B = 0 for b in msg[2:]: A = (A + b) % 256 B = (B + A) % 256 return msg + bytes([A, B]) def get_assistnow_messages(token: bytes) -> list[bytes]: # make request # TODO: implement adding the last known location r = requests.get("https://online-live2.services.u-blox.com/GetOnlineData.ashx", params=urllib.parse.urlencode({ 'token': token, 'gnss': 'gps,glo', 'datatype': 'eph,alm,aux', }, safe=':,'), timeout=5) assert r.status_code == 200, "Got invalid status code" dat = r.content # split up messages msgs = [] while len(dat) > 0: assert dat[:2] == b"\xB5\x62" msg_len = 6 + (dat[5] << 8 | dat[4]) + 2 msgs.append(dat[:msg_len]) dat = dat[msg_len:] return msgs class TTYPigeon: def __init__(self): self.tty = serial.VTIMESerial(UBLOX_TTY, baudrate=9600, timeout=0) def send(self, dat: bytes) -> None: self.tty.write(dat) def receive(self) -> bytes: dat = b'' while len(dat) < 0x1000: d = self.tty.read(0x40) dat += d if len(d) == 0: break return dat def set_baud(self, baud: int) -> None: self.tty.baudrate = baud def wait_for_ack(self, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK, timeout: float = 0.5) -> bool: dat = b'' st = time.monotonic() while True: dat += self.receive() if ack in dat: cloudlog.debug("Received ACK from ublox") return True elif nack in dat: cloudlog.error("Received NACK from ublox") return False elif time.monotonic() - st > timeout: cloudlog.error("No response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def send_with_ack(self, dat: bytes, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK) -> None: self.send(dat) self.wait_for_ack(ack, nack) def wait_for_backup_restore_status(self, timeout: float = 1.) -> int: dat = b'' st = time.monotonic() while True: dat += self.receive() position = dat.find(UBLOX_BACKUP_RESTORE_MSG) if position >= 0 and len(dat) >= position + 11: return dat[position + 10] elif time.monotonic() - st > timeout: cloudlog.error("No backup restore response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def reset_device(self) -> bool: # deleting the backup does not always work on first try (mostly on second try) for _ in range(5): # device cold start self.send(b"\xb5\x62\x06\x04\x04\x00\xff\xff\x00\x00\x0c\x5d") time.sleep(1) # wait for cold start init_baudrate(self) # clear configuration self.send_with_ack(b"\xb5\x62\x06\x09\x0d\x00\x1f\x1f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17\x71\xd7") # clear flash memory (almanac backup) self.send_with_ack(b"\xB5\x62\x09\x14\x04\x00\x01\x00\x00\x00\x22\xf0") # try restoring backup to verify it got deleted self.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") # 1: failed to restore, 2: could restore, 3: no backup status = self.wait_for_backup_restore_status() if status == 1 or status == 3: return True return False def init_baudrate(pigeon: TTYPigeon): # ublox default setting on startup is 9600 baudrate pigeon.set_baud(9600) # $PUBX,41,1,0007,0003,460800,0*15\r\n pigeon.send(b"\x24\x50\x55\x42\x58\x2C\x34\x31\x2C\x31\x2C\x30\x30\x30\x37\x2C\x30\x30\x30\x33\x2C\x34\x36\x30\x38\x30\x30\x2C\x30\x2A\x31\x35\x0D\x0A") time.sleep(0.1) pigeon.set_baud(460800) def initialize_pigeon(pigeon: TTYPigeon) -> bool: # try initializing a few times for _ in range(10): try: # setup port config pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x00\x00\x06\x18") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x01\x08\x22") pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x03\x0A\x24") # UBX-CFG-RATE (0x06 0x08) pigeon.send_with_ack(b"\xB5\x62\x06\x08\x06\x00\x64\x00\x01\x00\x00\x00\x79\x10") # UBX-CFG-NAV5 (0x06 0x24) pigeon.send_with_ack(b"\xB5\x62\x06\x24\x24\x00\x05\x00\x04\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5A\x63") # UBX-CFG-ODO (0x06 0x1E) pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x14\x00\x00\x00\x00\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3C\x37") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x08\x00\xFF\xAD\x62\xAD\x1E\x63\x00\x00\x83\x0C") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x28\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x56\x24") # UBX-CFG-NAV5 (0x06 0x24) pigeon.send_with_ack(b"\xB5\x62\x06\x24\x00\x00\x2A\x84") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x00\x00\x29\x81") pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x00\x00\x24\x72") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x00\x00\x3F\xC3") # UBX-CFG-MSG (set message rate) pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x07\x01\x13\x51") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x15\x01\x22\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x13\x01\x20\x6C") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x09\x01\x1E\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x0B\x01\x20\x74") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x35\x01\x41\xAD") cloudlog.debug("pigeon configured") # try restoring almanac backup pigeon.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") restore_status = pigeon.wait_for_backup_restore_status() if restore_status == 2: cloudlog.warning("almanac backup restored") elif restore_status == 3: cloudlog.warning("no almanac backup found") else: cloudlog.error(f"failed to restore almanac backup, status: {restore_status}") # sending time to ublox if system_time_valid(): t_now = datetime.now(UTC).replace(tzinfo=None) cloudlog.warning("Sending current time to ublox") # UBX-MGA-INI-TIME_UTC msg = add_ubx_checksum(b"\xB5\x62\x13\x40\x18\x00" + struct.pack(" tuple[TTYPigeon, messaging.PubMaster]: pigeon = None # register exit handler signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon)) pm = messaging.PubMaster(['ubloxRaw']) # power cycle ublox set_power(False) time.sleep(0.1) set_power(True) time.sleep(0.5) pigeon = TTYPigeon() return pigeon, pm def run_receiving(pigeon: TTYPigeon, pm: messaging.PubMaster, duration: int = 0): start_time = time.monotonic() def end_condition(): return True if duration == 0 else time.monotonic() - start_time < duration while end_condition(): dat = pigeon.receive() if len(dat) > 0: if dat[0] == 0x00: cloudlog.warning("received invalid data from ublox, re-initing!") init_baudrate(pigeon) initialize_pigeon(pigeon) continue # send out to socket msg = messaging.new_message('ubloxRaw', len(dat), valid=True) msg.ubloxRaw = dat[:] pm.send('ubloxRaw', msg) else: # prevent locking up a CPU core if ublox disconnects time.sleep(0.001) def main(): assert TICI, "unsupported hardware for pigeond" pigeon, pm = create_pigeon() init_baudrate(pigeon) initialize_pigeon(pigeon) # start receiving data run_receiving(pigeon, pm) if __name__ == "__main__": main()