From 4111ac36b2d676e4a8014b8728f3487b5252892d Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Thu, 7 Sep 2023 19:29:34 -0700 Subject: [PATCH] ubloxd: remove unused file (#29829) --- system/ubloxd/tests/ublox.py | 938 ----------------------------------- 1 file changed, 938 deletions(-) delete mode 100755 system/ubloxd/tests/ublox.py diff --git a/system/ubloxd/tests/ublox.py b/system/ubloxd/tests/ublox.py deleted file mode 100755 index 3243500e74..0000000000 --- a/system/ubloxd/tests/ublox.py +++ /dev/null @@ -1,938 +0,0 @@ -#!/usr/bin/env python3 -''' -UBlox binary protocol handling - -Copyright Andrew Tridgell, October 2012 -Released under GNU GPL version 3 or later - -WARNING: This code has originally intended for -ublox version 7, it has been adapted to work -for ublox version 8, not all functions may work. -''' - - -import struct -import time - -# protocol constants -PREAMBLE1 = 0xb5 -PREAMBLE2 = 0x62 - -# message classes -CLASS_NAV = 0x01 -CLASS_RXM = 0x02 -CLASS_INF = 0x04 -CLASS_ACK = 0x05 -CLASS_CFG = 0x06 -CLASS_MON = 0x0A -CLASS_AID = 0x0B -CLASS_TIM = 0x0D -CLASS_ESF = 0x10 - -# ACK messages -MSG_ACK_NACK = 0x00 -MSG_ACK_ACK = 0x01 - -# NAV messages -MSG_NAV_POSECEF = 0x1 -MSG_NAV_POSLLH = 0x2 -MSG_NAV_STATUS = 0x3 -MSG_NAV_DOP = 0x4 -MSG_NAV_SOL = 0x6 -MSG_NAV_PVT = 0x7 -MSG_NAV_POSUTM = 0x8 -MSG_NAV_VELNED = 0x12 -MSG_NAV_VELECEF = 0x11 -MSG_NAV_TIMEGPS = 0x20 -MSG_NAV_TIMEUTC = 0x21 -MSG_NAV_CLOCK = 0x22 -MSG_NAV_SVINFO = 0x30 -MSG_NAV_SAT = 0x35 -MSG_NAV_AOPSTATUS = 0x60 -MSG_NAV_DGPS = 0x31 -MSG_NAV_DOP = 0x04 -MSG_NAV_EKFSTATUS = 0x40 -MSG_NAV_SBAS = 0x32 -MSG_NAV_SOL = 0x06 -MSG_NAV_SAT = 0x35 - -# RXM messages -MSG_RXM_RAW = 0x15 -MSG_RXM_SFRB = 0x11 -MSG_RXM_SFRBX = 0x13 -MSG_RXM_SVSI = 0x20 -MSG_RXM_EPH = 0x31 -MSG_RXM_ALM = 0x30 -MSG_RXM_PMREQ = 0x41 - -# AID messages -MSG_AID_ALM = 0x30 -MSG_AID_EPH = 0x31 -MSG_AID_ALPSRV = 0x32 -MSG_AID_AOP = 0x33 -MSG_AID_DATA = 0x10 -MSG_AID_ALP = 0x50 -MSG_AID_DATA = 0x10 -MSG_AID_HUI = 0x02 -MSG_AID_INI = 0x01 -MSG_AID_REQ = 0x00 - -# CFG messages -MSG_CFG_PRT = 0x00 -MSG_CFG_ANT = 0x13 -MSG_CFG_DAT = 0x06 -MSG_CFG_EKF = 0x12 -MSG_CFG_ESFGWT = 0x29 -MSG_CFG_CFG = 0x09 -MSG_CFG_USB = 0x1b -MSG_CFG_RATE = 0x08 -MSG_CFG_SET_RATE = 0x01 -MSG_CFG_NAV5 = 0x24 -MSG_CFG_FXN = 0x0E -MSG_CFG_INF = 0x02 -MSG_CFG_ITFM = 0x39 -MSG_CFG_MSG = 0x01 -MSG_CFG_NAVX5 = 0x23 -MSG_CFG_NMEA = 0x17 -MSG_CFG_NVS = 0x22 -MSG_CFG_PM2 = 0x3B -MSG_CFG_PM = 0x32 -MSG_CFG_ITMF = 0x39 -MSG_CFG_RINV = 0x34 -MSG_CFG_RST = 0x04 -MSG_CFG_RXM = 0x11 -MSG_CFG_SBAS = 0x16 -MSG_CFG_TMODE2 = 0x3D -MSG_CFG_TMODE = 0x1D -MSG_CFG_TPS = 0x31 -MSG_CFG_TP = 0x07 -MSG_CFG_GNSS = 0x3E -MSG_CFG_ODO = 0x1E - -# ESF messages -MSG_ESF_MEAS = 0x02 -MSG_ESF_STATUS = 0x10 - -# INF messages -MSG_INF_DEBUG = 0x04 -MSG_INF_ERROR = 0x00 -MSG_INF_NOTICE = 0x02 -MSG_INF_TEST = 0x03 -MSG_INF_WARNING = 0x01 - -# MON messages -MSG_MON_SCHD = 0x01 -MSG_MON_HW = 0x09 -MSG_MON_HW2 = 0x0B -MSG_MON_IO = 0x02 -MSG_MON_MSGPP = 0x06 -MSG_MON_RXBUF = 0x07 -MSG_MON_RXR = 0x21 -MSG_MON_TXBUF = 0x08 -MSG_MON_VER = 0x04 - -# TIM messages -MSG_TIM_TP = 0x01 -MSG_TIM_TM2 = 0x03 -MSG_TIM_SVIN = 0x04 -MSG_TIM_VRFY = 0x06 - -# port IDs -PORT_DDC = 0 -PORT_SERIAL1 = 1 -PORT_SERIAL2 = 2 -PORT_USB = 3 -PORT_SPI = 4 - -# dynamic models -DYNAMIC_MODEL_PORTABLE = 0 -DYNAMIC_MODEL_STATIONARY = 2 -DYNAMIC_MODEL_PEDESTRIAN = 3 -DYNAMIC_MODEL_AUTOMOTIVE = 4 -DYNAMIC_MODEL_SEA = 5 -DYNAMIC_MODEL_AIRBORNE1G = 6 -DYNAMIC_MODEL_AIRBORNE2G = 7 -DYNAMIC_MODEL_AIRBORNE4G = 8 - -#reset items -RESET_HOT = 0 -RESET_WARM = 1 -RESET_COLD = 0xFFFF - -RESET_HW = 0 -RESET_SW = 1 -RESET_SW_GPS = 2 -RESET_HW_GRACEFUL = 4 -RESET_GPS_STOP = 8 -RESET_GPS_START = 9 - - -class UBloxError(Exception): - '''Ublox error class''' - - def __init__(self, msg): - Exception.__init__(self, msg) - self.message = msg - - -class UBloxAttrDict(dict): - '''allow dictionary members as attributes''' - - def __init__(self): - dict.__init__(self) - - def __getattr__(self, name): - try: - return self.__getitem__(name) - except KeyError as e: - raise RuntimeError(f"ublock invalid attr: {name}") from e - - def __setattr__(self, name, value): - if name in self.__dict__: - # allow set on normal attributes - dict.__setattr__(self, name, value) - else: - self.__setitem__(name, value) - - -def ArrayParse(field): - '''parse an array descriptor''' - arridx = field.find('[') - if arridx == -1: - return (field, -1) - alen = int(field[arridx + 1:-1]) - fieldname = field[:arridx] - return (fieldname, alen) - - -class UBloxDescriptor: - '''class used to describe the layout of a UBlox message''' - - def __init__(self, - name, - msg_format, - fields=None, - count_field=None, - format2=None, - fields2=None): - if fields is None: - fields = [] - - self.name = name - self.msg_format = msg_format - self.fields = fields - self.count_field = count_field - self.format2 = format2 - self.fields2 = fields2 - - def unpack(self, msg): - '''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg''' - msg._fields = {} - - # unpack main message blocks. A comm - formats = self.msg_format.split(',') - buf = msg._buf[6:-2] - count = 0 - msg._recs = [] - fields = self.fields[:] - - for fmt in formats: - size1 = struct.calcsize(fmt) - if size1 > len(buf): - raise UBloxError("%s INVALID_SIZE1=%u" % (self.name, len(buf))) - f1 = list(struct.unpack(fmt, buf[:size1])) - i = 0 - while i < len(f1): - field = fields.pop(0) - (fieldname, alen) = ArrayParse(field) - if alen == -1: - msg._fields[fieldname] = f1[i] - if self.count_field == fieldname: - count = int(f1[i]) - i += 1 - else: - msg._fields[fieldname] = [0] * alen - for a in range(alen): - msg._fields[fieldname][a] = f1[i] - i += 1 - buf = buf[size1:] - if len(buf) == 0: - break - - if self.count_field == '_remaining': - count = len(buf) // struct.calcsize(self.format2) - - if count == 0: - msg._unpacked = True - if len(buf) != 0: - raise UBloxError("EXTRA_BYTES=%u" % len(buf)) - return - - size2 = struct.calcsize(self.format2) - for _ in range(count): - r = UBloxAttrDict() - if size2 > len(buf): - raise UBloxError("INVALID_SIZE=%u, " % len(buf)) - f2 = list(struct.unpack(self.format2, buf[:size2])) - for i in range(len(self.fields2)): - r[self.fields2[i]] = f2[i] - buf = buf[size2:] - msg._recs.append(r) - if len(buf) != 0: - raise UBloxError("EXTRA_BYTES=%u" % len(buf)) - msg._unpacked = True - - def pack(self, msg, msg_class=None, msg_id=None): - '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' - f1 = [] - if msg_class is None: - msg_class = msg.msg_class() - if msg_id is None: - msg_id = msg.msg_id() - msg._buf = '' - - fields = self.fields[:] - for f in fields: - (fieldname, alen) = ArrayParse(f) - if fieldname not in msg._fields: - break - if alen == -1: - f1.append(msg._fields[fieldname]) - else: - for a in range(alen): - f1.append(msg._fields[fieldname][a]) - try: - # try full length message - fmt = self.msg_format.replace(',', '') - msg._buf = struct.pack(fmt, *tuple(f1)) - except Exception: - # try without optional part - fmt = self.msg_format.split(',')[0] - msg._buf = struct.pack(fmt, *tuple(f1)) - - length = len(msg._buf) - if msg._recs: - length += len(msg._recs) * struct.calcsize(self.format2) - header = struct.pack('= level: - print(msg) - - def unpack(self): - '''unpack a message''' - if not self.valid(): - raise UBloxError('INVALID MESSAGE') - msg_type = self.msg_type() - if msg_type not in msg_types: - raise UBloxError('Unknown message %s length=%u' % (str(msg_type), len(self._buf))) - msg_types[msg_type].unpack(self) - return self._fields, self._recs - - def pack(self): - '''pack a message''' - if not self.valid(): - raise UBloxError('INVALID MESSAGE') - msg_type = self.msg_type() - if msg_type not in msg_types: - raise UBloxError('Unknown message %s' % str(msg_type)) - msg_types[msg_type].pack(self) - - def name(self): - '''return the short string name for a message''' - if not self.valid(): - raise UBloxError('INVALID MESSAGE') - msg_type = self.msg_type() - if msg_type not in msg_types: - raise UBloxError('Unknown message %s length=%u' % (str(msg_types), len(self._buf))) - return msg_types[msg_type].name - - def msg_class(self): - '''return the message class''' - return self._buf[2] - - def msg_id(self): - '''return the message id within the class''' - return self._buf[3] - - def msg_type(self): - '''return the message type tuple (class, id)''' - return (self.msg_class(), self.msg_id()) - - def msg_length(self): - '''return the payload length''' - (payload_length, ) = struct.unpack(' 0 and self._buf[0] != PREAMBLE1: - return False - if len(self._buf) > 1 and self._buf[1] != PREAMBLE2: - self.debug(1, "bad pre2") - return False - if self.needed_bytes() == 0 and not self.valid(): - if len(self._buf) > 8: - self.debug(1, "bad checksum len=%u needed=%u" % (len(self._buf), - self.needed_bytes())) - else: - self.debug(1, "bad len len=%u needed=%u" % (len(self._buf), self.needed_bytes())) - return False - return True - - def add(self, data): - '''add some bytes to a message''' - self._buf += data - while not self.valid_so_far() and len(self._buf) > 0: - '''handle corrupted streams''' - self._buf = self._buf[1:] - if self.needed_bytes() < 0: - self._buf = "" - - def checksum(self, data=None): - '''return a checksum tuple for a message''' - if data is None: - data = self._buf[2:-2] - #cs = 0 - ck_a = 0 - ck_b = 0 - for i in data: - ck_a = (ck_a + i) & 0xFF - ck_b = (ck_b + ck_a) & 0xFF - return (ck_a, ck_b) - - def valid_checksum(self): - '''check if the checksum is OK''' - (ck_a, ck_b) = self.checksum() - #d = self._buf[2:-2] - (ck_a2, ck_b2) = struct.unpack('= 8 and self.needed_bytes() == 0 and self.valid_checksum() - - -class UBlox: - def __init__(self, dev, baudrate): - self.dev = dev - self.baudrate = baudrate - - self.use_sendrecv = False - self.read_only = False - self.debug_level = 0 - - self.logfile = None - self.log = None - self.preferred_dynamic_model = None - self.preferred_usePPP = None - self.preferred_dgps_timeout = None - - def close(self): - '''close the device''' - self.dev.close() - self.dev = None - - def set_debug(self, debug_level): - '''set debug level''' - self.debug_level = debug_level - - def debug(self, level, msg): - '''write a debug message''' - if self.debug_level >= level: - print(msg) - - def set_logfile(self, logfile, append=False): - '''setup logging to a file''' - if self.log is not None: - self.log.close() - self.log = None - self.logfile = logfile - if self.logfile is not None: - if append: - mode = 'ab' - else: - mode = 'wb' - self.log = open(self.logfile, mode=mode) - - def set_preferred_dynamic_model(self, model): - '''set the preferred dynamic model for receiver''' - self.preferred_dynamic_model = model - if model is not None: - self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) - - def set_preferred_dgps_timeout(self, timeout): - '''set the preferred DGPS timeout for receiver''' - self.preferred_dgps_timeout = timeout - if timeout is not None: - self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) - - def set_preferred_usePPP(self, usePPP): - '''set the preferred usePPP setting for the receiver''' - if usePPP is None: - self.preferred_usePPP = None - return - self.preferred_usePPP = int(usePPP) - self.configure_poll(CLASS_CFG, MSG_CFG_NAVX5) - - def nmea_checksum(self, msg): - d = msg[1:] - cs = 0 - for i in d: - cs ^= ord(i) - return cs - - def write(self, buf): - '''write some bytes''' - if not self.read_only: - if self.use_sendrecv: - return self.dev.send(buf) - if isinstance(buf, str): - return self.dev.write(str.encode(buf)) - else: - return self.dev.write(buf) - - def read(self, n): - '''read some bytes''' - if self.use_sendrecv: - try: - return self.dev.recv(n) - except OSError: - return '' - return self.dev.read(n) - - def send_nmea(self, msg): - if not self.read_only: - s = msg + "*%02X" % self.nmea_checksum(msg) + "\r\n" - self.write(s) - - def set_binary(self): - '''put a UBlox into binary mode using a NMEA string''' - if not self.read_only: - print("try set binary at %u" % self.baudrate) - self.send_nmea("$PUBX,41,0,0007,0001,%u,0" % self.baudrate) - self.send_nmea("$PUBX,41,1,0007,0001,%u,0" % self.baudrate) - self.send_nmea("$PUBX,41,2,0007,0001,%u,0" % self.baudrate) - self.send_nmea("$PUBX,41,3,0007,0001,%u,0" % self.baudrate) - self.send_nmea("$PUBX,41,4,0007,0001,%u,0" % self.baudrate) - self.send_nmea("$PUBX,41,5,0007,0001,%u,0" % self.baudrate) - - def disable_nmea(self): - ''' stop sending all types of nmea messages ''' - self.send_nmea("$PUBX,40,GSV,1,1,1,1,1,0") - self.send_nmea("$PUBX,40,GGA,0,0,0,0,0,0") - self.send_nmea("$PUBX,40,GSA,0,0,0,0,0,0") - self.send_nmea("$PUBX,40,VTG,0,0,0,0,0,0") - self.send_nmea("$PUBX,40,TXT,0,0,0,0,0,0") - self.send_nmea("$PUBX,40,RMC,0,0,0,0,0,0") - - def seek_percent(self, pct): - '''seek to the given percentage of a file''' - self.dev.seek(0, 2) - filesize = self.dev.tell() - self.dev.seek(pct * 0.01 * filesize) - - def special_handling(self, msg): - '''handle automatic configuration changes''' - if msg.name() == 'CFG_NAV5': - msg.unpack() - sendit = False - pollit = False - if self.preferred_dynamic_model is not None and msg.dynModel != self.preferred_dynamic_model: - msg.dynModel = self.preferred_dynamic_model - sendit = True - pollit = True - if self.preferred_dgps_timeout is not None and msg.dgpsTimeOut != self.preferred_dgps_timeout: - msg.dgpsTimeOut = self.preferred_dgps_timeout - self.debug(2, "Setting dgpsTimeOut=%u" % msg.dgpsTimeOut) - sendit = True - # we don't re-poll for this one, as some receivers refuse to set it - if sendit: - msg.pack() - self.send(msg) - if pollit: - self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) - if msg.name() == 'CFG_NAVX5' and self.preferred_usePPP is not None: - msg.unpack() - if msg.usePPP != self.preferred_usePPP: - msg.usePPP = self.preferred_usePPP - msg.mask = 1 << 13 - msg.pack() - self.send(msg) - self.configure_poll(CLASS_CFG, MSG_CFG_NAVX5) - - def receive_message(self, ignore_eof=False): - '''blocking receive of one ublox message''' - msg = UBloxMessage() - while True: - n = msg.needed_bytes() - b = self.read(n) - if not b: - if ignore_eof: - time.sleep(0.01) - continue - if len(msg._buf) > 0: - self.debug(1, "dropping %d bytes" % len(msg._buf)) - return None - msg.add(b) - if self.log is not None: - self.log.write(b) - self.log.flush() - if msg.valid(): - self.special_handling(msg) - return msg - - def receive_message_noerror(self, ignore_eof=False): - '''blocking receive of one ublox message, ignoring errors''' - try: - return self.receive_message(ignore_eof=ignore_eof) - except UBloxError as e: - print(e) - return None - except OSError as e: - # Occasionally we get hit with 'resource temporarily unavailable' - # messages here on the serial device, catch them too. - print(e) - return None - - def send(self, msg): - '''send a preformatted ublox message''' - if not msg.valid(): - self.debug(1, "invalid send") - return - if not self.read_only: - self.write(msg._buf) - - def send_message(self, msg_class, msg_id, payload): - '''send a ublox message with class, id and payload''' - msg = UBloxMessage() - msg._buf = struct.pack('