#!/usr/bin/env python3 import sys import time import signal import serial import struct import requests import urllib.parse from datetime import datetime from typing import List, Optional from cereal import messaging from common.params import Params from system.swaglog import cloudlog from system.hardware import TICI from common.gpio import gpio_init, gpio_set from system.hardware.tici.pins import GPIO UBLOX_TTY = "/dev/ttyHS0" UBLOX_ACK = b"\xb5\x62\x05\x01\x02\x00" UBLOX_NACK = b"\xb5\x62\x05\x00\x02\x00" UBLOX_SOS_ACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x01\x00\x00\x00" UBLOX_SOS_NACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x00\x00\x00\x00" UBLOX_BACKUP_RESTORE_MSG = b"\xb5\x62\x09\x14\x08\x00\x03" UBLOX_ASSIST_ACK = b"\xb5\x62\x13\x60\x08\x00" def set_power(enabled: bool) -> None: gpio_init(GPIO.UBLOX_SAFEBOOT_N, True) gpio_init(GPIO.UBLOX_PWR_EN, True) gpio_init(GPIO.UBLOX_RST_N, True) gpio_set(GPIO.UBLOX_SAFEBOOT_N, True) gpio_set(GPIO.UBLOX_PWR_EN, enabled) gpio_set(GPIO.UBLOX_RST_N, enabled) def add_ubx_checksum(msg: bytes) -> bytes: A = B = 0 for b in msg[2:]: A = (A + b) % 256 B = (B + A) % 256 return msg + bytes([A, B]) def get_assistnow_messages(token: bytes) -> List[bytes]: # make request # TODO: implement adding the last known location r = requests.get("https://online-live2.services.u-blox.com/GetOnlineData.ashx", params=urllib.parse.urlencode({ 'token': token, 'gnss': 'gps,glo', 'datatype': 'eph,alm,aux', }, safe=':,'), timeout=5) assert r.status_code == 200, "Got invalid status code" dat = r.content # split up messages msgs = [] while len(dat) > 0: assert dat[:2] == b"\xB5\x62" msg_len = 6 + (dat[5] << 8 | dat[4]) + 2 msgs.append(dat[:msg_len]) dat = dat[msg_len:] return msgs class TTYPigeon(): def __init__(self): self.tty = serial.VTIMESerial(UBLOX_TTY, baudrate=9600, timeout=0) def send(self, dat: bytes) -> None: self.tty.write(dat) def receive(self) -> bytes: dat = b'' while len(dat) < 0x1000: d = self.tty.read(0x40) dat += d if len(d) == 0: break return dat def set_baud(self, baud: int) -> None: self.tty.baudrate = baud def wait_for_ack(self, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK, timeout: float = 0.5) -> bool: dat = b'' st = time.monotonic() while True: dat += self.receive() if ack in dat: cloudlog.debug("Received ACK from ublox") return True elif nack in dat: cloudlog.error("Received NACK from ublox") return False elif time.monotonic() - st > timeout: cloudlog.error("No response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def send_with_ack(self, dat: bytes, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK) -> None: self.send(dat) self.wait_for_ack(ack, nack) def wait_for_backup_restore_status(self, timeout: float = 1.) -> int: dat = b'' st = time.monotonic() while True: dat += self.receive() position = dat.find(UBLOX_BACKUP_RESTORE_MSG) if position >= 0 and len(dat) >= position + 11: return dat[position + 10] elif time.monotonic() - st > timeout: cloudlog.error("No backup restore response from ublox") raise TimeoutError('No response from ublox') time.sleep(0.001) def initialize_pigeon(pigeon: TTYPigeon) -> None: # try initializing a few times for _ in range(10): try: pigeon.set_baud(9600) # up baud rate pigeon.send(b"\x24\x50\x55\x42\x58\x2C\x34\x31\x2C\x31\x2C\x30\x30\x30\x37\x2C\x30\x30\x30\x33\x2C\x34\x36\x30\x38\x30\x30\x2C\x30\x2A\x31\x35\x0D\x0A") time.sleep(0.1) pigeon.set_baud(460800) # other configuration messages pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x00\x00\x06\x18") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x01\x00\x01\x08\x22") pigeon.send_with_ack(b"\xB5\x62\x06\x00\x01\x00\x03\x0A\x24") pigeon.send_with_ack(b"\xB5\x62\x06\x08\x06\x00\x64\x00\x01\x00\x00\x00\x79\x10") pigeon.send_with_ack(b"\xB5\x62\x06\x24\x24\x00\x05\x00\x04\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5A\x63") pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x14\x00\x00\x00\x00\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3C\x37") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x08\x00\xFF\xAD\x62\xAD\x1E\x63\x00\x00\x83\x0C") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x28\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x56\x24") pigeon.send_with_ack(b"\xB5\x62\x06\x24\x00\x00\x2A\x84") pigeon.send_with_ack(b"\xB5\x62\x06\x23\x00\x00\x29\x81") pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x00\x00\x24\x72") pigeon.send_with_ack(b"\xB5\x62\x06\x39\x00\x00\x3F\xC3") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x07\x01\x13\x51") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x15\x01\x22\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x13\x01\x20\x6C") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x09\x01\x1E\x70") pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x0A\x0B\x01\x20\x74") cloudlog.debug("pigeon configured") # try restoring almanac backup pigeon.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") restore_status = pigeon.wait_for_backup_restore_status() if restore_status == 2: cloudlog.warning("almanac backup restored") elif restore_status == 3: cloudlog.warning("no almanac backup found") else: cloudlog.error(f"failed to restore almanac backup, status: {restore_status}") # sending time to ublox t_now = datetime.utcnow() if t_now >= datetime(2021, 6, 1): cloudlog.warning("Sending current time to ublox") # UBX-MGA-INI-TIME_UTC msg = add_ubx_checksum(b"\xB5\x62\x13\x40\x18\x00" + struct.pack(" 0: if dat[0] == 0x00: cloudlog.warning("received invalid data from ublox, re-initing!") initialize_pigeon(pigeon) continue # send out to socket msg = messaging.new_message('ubloxRaw', len(dat)) msg.ubloxRaw = dat[:] pm.send('ubloxRaw', msg) if __name__ == "__main__": main()