#!/usr/bin/env python3 # pylint: skip-file ''' UBlox binary protocol handling Copyright Andrew Tridgell, October 2012 Released under GNU GPL version 3 or later WARNING: This code has originally intended for ublox version 7, it has been adapted to work for ublox version 8, not all functions may work. ''' import struct import time # protocol constants PREAMBLE1 = 0xb5 PREAMBLE2 = 0x62 # message classes CLASS_NAV = 0x01 CLASS_RXM = 0x02 CLASS_INF = 0x04 CLASS_ACK = 0x05 CLASS_CFG = 0x06 CLASS_MON = 0x0A CLASS_AID = 0x0B CLASS_TIM = 0x0D CLASS_ESF = 0x10 # ACK messages MSG_ACK_NACK = 0x00 MSG_ACK_ACK = 0x01 # NAV messages MSG_NAV_POSECEF = 0x1 MSG_NAV_POSLLH = 0x2 MSG_NAV_STATUS = 0x3 MSG_NAV_DOP = 0x4 MSG_NAV_SOL = 0x6 MSG_NAV_PVT = 0x7 MSG_NAV_POSUTM = 0x8 MSG_NAV_VELNED = 0x12 MSG_NAV_VELECEF = 0x11 MSG_NAV_TIMEGPS = 0x20 MSG_NAV_TIMEUTC = 0x21 MSG_NAV_CLOCK = 0x22 MSG_NAV_SVINFO = 0x30 MSG_NAV_SAT = 0x35 MSG_NAV_AOPSTATUS = 0x60 MSG_NAV_DGPS = 0x31 MSG_NAV_DOP = 0x04 MSG_NAV_EKFSTATUS = 0x40 MSG_NAV_SBAS = 0x32 MSG_NAV_SOL = 0x06 MSG_NAV_SAT = 0x35 # RXM messages MSG_RXM_RAW = 0x15 MSG_RXM_SFRB = 0x11 MSG_RXM_SFRBX = 0x13 MSG_RXM_SVSI = 0x20 MSG_RXM_EPH = 0x31 MSG_RXM_ALM = 0x30 MSG_RXM_PMREQ = 0x41 # AID messages MSG_AID_ALM = 0x30 MSG_AID_EPH = 0x31 MSG_AID_ALPSRV = 0x32 MSG_AID_AOP = 0x33 MSG_AID_DATA = 0x10 MSG_AID_ALP = 0x50 MSG_AID_DATA = 0x10 MSG_AID_HUI = 0x02 MSG_AID_INI = 0x01 MSG_AID_REQ = 0x00 # CFG messages MSG_CFG_PRT = 0x00 MSG_CFG_ANT = 0x13 MSG_CFG_DAT = 0x06 MSG_CFG_EKF = 0x12 MSG_CFG_ESFGWT = 0x29 MSG_CFG_CFG = 0x09 MSG_CFG_USB = 0x1b MSG_CFG_RATE = 0x08 MSG_CFG_SET_RATE = 0x01 MSG_CFG_NAV5 = 0x24 MSG_CFG_FXN = 0x0E MSG_CFG_INF = 0x02 MSG_CFG_ITFM = 0x39 MSG_CFG_MSG = 0x01 MSG_CFG_NAVX5 = 0x23 MSG_CFG_NMEA = 0x17 MSG_CFG_NVS = 0x22 MSG_CFG_PM2 = 0x3B MSG_CFG_PM = 0x32 MSG_CFG_ITMF = 0x39 MSG_CFG_RINV = 0x34 MSG_CFG_RST = 0x04 MSG_CFG_RXM = 0x11 MSG_CFG_SBAS = 0x16 MSG_CFG_TMODE2 = 0x3D MSG_CFG_TMODE = 0x1D MSG_CFG_TPS = 0x31 MSG_CFG_TP = 0x07 MSG_CFG_GNSS = 0x3E MSG_CFG_ODO = 0x1E # ESF messages MSG_ESF_MEAS = 0x02 MSG_ESF_STATUS = 0x10 # INF messages MSG_INF_DEBUG = 0x04 MSG_INF_ERROR = 0x00 MSG_INF_NOTICE = 0x02 MSG_INF_TEST = 0x03 MSG_INF_WARNING = 0x01 # MON messages MSG_MON_SCHD = 0x01 MSG_MON_HW = 0x09 MSG_MON_HW2 = 0x0B MSG_MON_IO = 0x02 MSG_MON_MSGPP = 0x06 MSG_MON_RXBUF = 0x07 MSG_MON_RXR = 0x21 MSG_MON_TXBUF = 0x08 MSG_MON_VER = 0x04 # TIM messages MSG_TIM_TP = 0x01 MSG_TIM_TM2 = 0x03 MSG_TIM_SVIN = 0x04 MSG_TIM_VRFY = 0x06 # port IDs PORT_DDC = 0 PORT_SERIAL1 = 1 PORT_SERIAL2 = 2 PORT_USB = 3 PORT_SPI = 4 # dynamic models DYNAMIC_MODEL_PORTABLE = 0 DYNAMIC_MODEL_STATIONARY = 2 DYNAMIC_MODEL_PEDESTRIAN = 3 DYNAMIC_MODEL_AUTOMOTIVE = 4 DYNAMIC_MODEL_SEA = 5 DYNAMIC_MODEL_AIRBORNE1G = 6 DYNAMIC_MODEL_AIRBORNE2G = 7 DYNAMIC_MODEL_AIRBORNE4G = 8 #reset items RESET_HOT = 0 RESET_WARM = 1 RESET_COLD = 0xFFFF RESET_HW = 0 RESET_SW = 1 RESET_SW_GPS = 2 RESET_HW_GRACEFUL = 4 RESET_GPS_STOP = 8 RESET_GPS_START = 9 class UBloxError(Exception): '''Ublox error class''' def __init__(self, msg): Exception.__init__(self, msg) self.message = msg class UBloxAttrDict(dict): '''allow dictionary members as attributes''' def __init__(self): dict.__init__(self) def __getattr__(self, name): try: return self.__getitem__(name) except KeyError: raise AttributeError(name) def __setattr__(self, name, value): if name in self.__dict__: # allow set on normal attributes dict.__setattr__(self, name, value) else: self.__setitem__(name, value) def ArrayParse(field): '''parse an array descriptor''' arridx = field.find('[') if arridx == -1: return (field, -1) alen = int(field[arridx + 1:-1]) fieldname = field[:arridx] return (fieldname, alen) class UBloxDescriptor: '''class used to describe the layout of a UBlox message''' def __init__(self, name, msg_format, fields=None, count_field=None, format2=None, fields2=None): if fields is None: fields = [] self.name = name self.msg_format = msg_format self.fields = fields self.count_field = count_field self.format2 = format2 self.fields2 = fields2 def unpack(self, msg): '''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg''' msg._fields = {} # unpack main message blocks. A comm formats = self.msg_format.split(',') buf = msg._buf[6:-2] count = 0 msg._recs = [] fields = self.fields[:] for fmt in formats: size1 = struct.calcsize(fmt) if size1 > len(buf): raise UBloxError("%s INVALID_SIZE1=%u" % (self.name, len(buf))) f1 = list(struct.unpack(fmt, buf[:size1])) i = 0 while i < len(f1): field = fields.pop(0) (fieldname, alen) = ArrayParse(field) if alen == -1: msg._fields[fieldname] = f1[i] if self.count_field == fieldname: count = int(f1[i]) i += 1 else: msg._fields[fieldname] = [0] * alen for a in range(alen): msg._fields[fieldname][a] = f1[i] i += 1 buf = buf[size1:] if len(buf) == 0: break if self.count_field == '_remaining': count = len(buf) // struct.calcsize(self.format2) if count == 0: msg._unpacked = True if len(buf) != 0: raise UBloxError("EXTRA_BYTES=%u" % len(buf)) return size2 = struct.calcsize(self.format2) for c in range(count): r = UBloxAttrDict() if size2 > len(buf): raise UBloxError("INVALID_SIZE=%u, " % len(buf)) f2 = list(struct.unpack(self.format2, buf[:size2])) for i in range(len(self.fields2)): r[self.fields2[i]] = f2[i] buf = buf[size2:] msg._recs.append(r) if len(buf) != 0: raise UBloxError("EXTRA_BYTES=%u" % len(buf)) msg._unpacked = True def pack(self, msg, msg_class=None, msg_id=None): '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' f1 = [] if msg_class is None: msg_class = msg.msg_class() if msg_id is None: msg_id = msg.msg_id() msg._buf = '' fields = self.fields[:] for f in fields: (fieldname, alen) = ArrayParse(f) if fieldname not in msg._fields: break if alen == -1: f1.append(msg._fields[fieldname]) else: for a in range(alen): f1.append(msg._fields[fieldname][a]) try: # try full length message fmt = self.msg_format.replace(',', '') msg._buf = struct.pack(fmt, *tuple(f1)) except Exception: # try without optional part fmt = self.msg_format.split(',')[0] msg._buf = struct.pack(fmt, *tuple(f1)) length = len(msg._buf) if msg._recs: length += len(msg._recs) * struct.calcsize(self.format2) header = struct.pack('= level: print(msg) def unpack(self): '''unpack a message''' if not self.valid(): raise UBloxError('INVALID MESSAGE') type = self.msg_type() if type not in msg_types: raise UBloxError('Unknown message %s length=%u' % (str(type), len(self._buf))) msg_types[type].unpack(self) return self._fields, self._recs def pack(self): '''pack a message''' if not self.valid(): raise UBloxError('INVALID MESSAGE') type = self.msg_type() if type not in msg_types: raise UBloxError('Unknown message %s' % str(type)) msg_types[type].pack(self) def name(self): '''return the short string name for a message''' if not self.valid(): raise UBloxError('INVALID MESSAGE') type = self.msg_type() if type not in msg_types: raise UBloxError('Unknown message %s length=%u' % (str(type), len(self._buf))) return msg_types[type].name def msg_class(self): '''return the message class''' return self._buf[2] def msg_id(self): '''return the message id within the class''' return self._buf[3] def msg_type(self): '''return the message type tuple (class, id)''' return (self.msg_class(), self.msg_id()) def msg_length(self): '''return the payload length''' (payload_length, ) = struct.unpack(' 0 and self._buf[0] != PREAMBLE1: return False if len(self._buf) > 1 and self._buf[1] != PREAMBLE2: self.debug(1, "bad pre2") return False if self.needed_bytes() == 0 and not self.valid(): if len(self._buf) > 8: self.debug(1, "bad checksum len=%u needed=%u" % (len(self._buf), self.needed_bytes())) else: self.debug(1, "bad len len=%u needed=%u" % (len(self._buf), self.needed_bytes())) return False return True def add(self, bytes): '''add some bytes to a message''' self._buf += bytes while not self.valid_so_far() and len(self._buf) > 0: '''handle corrupted streams''' self._buf = self._buf[1:] if self.needed_bytes() < 0: self._buf = "" def checksum(self, data=None): '''return a checksum tuple for a message''' if data is None: data = self._buf[2:-2] #cs = 0 ck_a = 0 ck_b = 0 for i in data: ck_a = (ck_a + i) & 0xFF ck_b = (ck_b + ck_a) & 0xFF return (ck_a, ck_b) def valid_checksum(self): '''check if the checksum is OK''' (ck_a, ck_b) = self.checksum() #d = self._buf[2:-2] (ck_a2, ck_b2) = struct.unpack('= 8 and self.needed_bytes() == 0 and self.valid_checksum() class UBlox: def __init__(self, dev, baudrate): self.dev = dev self.baudrate = baudrate self.use_sendrecv = False self.read_only = False self.debug_level = 0 self.logfile = None self.log = None self.preferred_dynamic_model = None self.preferred_usePPP = None self.preferred_dgps_timeout = None def close(self): '''close the device''' self.dev.close() self.dev = None def set_debug(self, debug_level): '''set debug level''' self.debug_level = debug_level def debug(self, level, msg): '''write a debug message''' if self.debug_level >= level: print(msg) def set_logfile(self, logfile, append=False): '''setup logging to a file''' if self.log is not None: self.log.close() self.log = None self.logfile = logfile if self.logfile is not None: if append: mode = 'ab' else: mode = 'wb' self.log = open(self.logfile, mode=mode) def set_preferred_dynamic_model(self, model): '''set the preferred dynamic model for receiver''' self.preferred_dynamic_model = model if model is not None: self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) def set_preferred_dgps_timeout(self, timeout): '''set the preferred DGPS timeout for receiver''' self.preferred_dgps_timeout = timeout if timeout is not None: self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) def set_preferred_usePPP(self, usePPP): '''set the preferred usePPP setting for the receiver''' if usePPP is None: self.preferred_usePPP = None return self.preferred_usePPP = int(usePPP) self.configure_poll(CLASS_CFG, MSG_CFG_NAVX5) def nmea_checksum(self, msg): d = msg[1:] cs = 0 for i in d: cs ^= ord(i) return cs def write(self, buf): '''write some bytes''' if not self.read_only: if self.use_sendrecv: return self.dev.send(buf) if type(buf) == str: return self.dev.write(str.encode(buf)) else: return self.dev.write(buf) def read(self, n): '''read some bytes''' if self.use_sendrecv: try: return self.dev.recv(n) except OSError: return '' return self.dev.read(n) def send_nmea(self, msg): if not self.read_only: s = msg + "*%02X" % self.nmea_checksum(msg) + "\r\n" self.write(s) def set_binary(self): '''put a UBlox into binary mode using a NMEA string''' if not self.read_only: print("try set binary at %u" % self.baudrate) self.send_nmea("$PUBX,41,0,0007,0001,%u,0" % self.baudrate) self.send_nmea("$PUBX,41,1,0007,0001,%u,0" % self.baudrate) self.send_nmea("$PUBX,41,2,0007,0001,%u,0" % self.baudrate) self.send_nmea("$PUBX,41,3,0007,0001,%u,0" % self.baudrate) self.send_nmea("$PUBX,41,4,0007,0001,%u,0" % self.baudrate) self.send_nmea("$PUBX,41,5,0007,0001,%u,0" % self.baudrate) def disable_nmea(self): ''' stop sending all types of nmea messages ''' self.send_nmea("$PUBX,40,GSV,1,1,1,1,1,0") self.send_nmea("$PUBX,40,GGA,0,0,0,0,0,0") self.send_nmea("$PUBX,40,GSA,0,0,0,0,0,0") self.send_nmea("$PUBX,40,VTG,0,0,0,0,0,0") self.send_nmea("$PUBX,40,TXT,0,0,0,0,0,0") self.send_nmea("$PUBX,40,RMC,0,0,0,0,0,0") def seek_percent(self, pct): '''seek to the given percentage of a file''' self.dev.seek(0, 2) filesize = self.dev.tell() self.dev.seek(pct * 0.01 * filesize) def special_handling(self, msg): '''handle automatic configuration changes''' if msg.name() == 'CFG_NAV5': msg.unpack() sendit = False pollit = False if self.preferred_dynamic_model is not None and msg.dynModel != self.preferred_dynamic_model: msg.dynModel = self.preferred_dynamic_model sendit = True pollit = True if self.preferred_dgps_timeout is not None and msg.dgpsTimeOut != self.preferred_dgps_timeout: msg.dgpsTimeOut = self.preferred_dgps_timeout self.debug(2, "Setting dgpsTimeOut=%u" % msg.dgpsTimeOut) sendit = True # we don't re-poll for this one, as some receivers refuse to set it if sendit: msg.pack() self.send(msg) if pollit: self.configure_poll(CLASS_CFG, MSG_CFG_NAV5) if msg.name() == 'CFG_NAVX5' and self.preferred_usePPP is not None: msg.unpack() if msg.usePPP != self.preferred_usePPP: msg.usePPP = self.preferred_usePPP msg.mask = 1 << 13 msg.pack() self.send(msg) self.configure_poll(CLASS_CFG, MSG_CFG_NAVX5) def receive_message(self, ignore_eof=False): '''blocking receive of one ublox message''' msg = UBloxMessage() while True: n = msg.needed_bytes() b = self.read(n) if not b: if ignore_eof: time.sleep(0.01) continue if len(msg._buf) > 0: self.debug(1, "dropping %d bytes" % len(msg._buf)) return None msg.add(b) if self.log is not None: self.log.write(b) self.log.flush() if msg.valid(): self.special_handling(msg) return msg def receive_message_noerror(self, ignore_eof=False): '''blocking receive of one ublox message, ignoring errors''' try: return self.receive_message(ignore_eof=ignore_eof) except UBloxError as e: print(e) return None except OSError as e: # Occasionally we get hit with 'resource temporarily unavailable' # messages here on the serial device, catch them too. print(e) return None def send(self, msg): '''send a preformatted ublox message''' if not msg.valid(): self.debug(1, "invalid send") return if not self.read_only: self.write(msg._buf) def send_message(self, msg_class, msg_id, payload): '''send a ublox message with class, id and payload''' msg = UBloxMessage() msg._buf = struct.pack('