import re import os import struct import bitstring from collections import namedtuple def int_or_float(s): # return number, trying to maintain int format try: return int(s) except ValueError: return float(s) DBCSignal = namedtuple( "DBCSignal", ["name", "start_bit", "size", "is_little_endian", "is_signed", "factor", "offset", "tmin", "tmax", "units"]) class dbc(object): def __init__(self, fn): self.name, _ = os.path.splitext(os.path.basename(fn)) with open(fn) as f: self.txt = f.read().split("\n") self._warned_addresses = set() # regexps from https://github.com/ebroecker/canmatrix/blob/master/canmatrix/importdbc.py bo_regexp = re.compile("^BO\_ (\w+) (\w+) *: (\w+) (\w+)") sg_regexp = re.compile("^SG\_ (\w+) : (\d+)\|(\d+)@(\d+)([\+|\-]) \(([0-9.+\-eE]+),([0-9.+\-eE]+)\) \[([0-9.+\-eE]+)\|([0-9.+\-eE]+)\] \"(.*)\" (.*)") sgm_regexp = re.compile("^SG\_ (\w+) (\w+) *: (\d+)\|(\d+)@(\d+)([\+|\-]) \(([0-9.+\-eE]+),([0-9.+\-eE]+)\) \[([0-9.+\-eE]+)\|([0-9.+\-eE]+)\] \"(.*)\" (.*)") # A dictionary which maps message ids to tuples ((name, size), signals). # name is the ASCII name of the message. # size is the size of the message in bytes. # signals is a list signals contained in the message. # signals is a list of DBCSignal in order of increasing start_bit. self.msgs = {} # lookup to bit reverse each byte self.bits_index = [(i & ~0b111) + ((-i-1) & 0b111) for i in xrange(64)] for l in self.txt: l = l.strip() if l.startswith("BO_ "): # new group dat = bo_regexp.match(l) if dat is None: print "bad BO", l name = dat.group(2) size = int(dat.group(3)) ids = int(dat.group(1), 0) # could be hex self.msgs[ids] = ((name, size), []) if l.startswith("SG_ "): # new signal dat = sg_regexp.match(l) go = 0 if dat is None: dat = sgm_regexp.match(l) go = 1 if dat is None: print "bad SG", l sgname = dat.group(1) start_bit = int(dat.group(go+2)) signal_size = int(dat.group(go+3)) is_little_endian = int(dat.group(go+4))==1 is_signed = dat.group(go+5)=='-' factor = int_or_float(dat.group(go+6)) offset = int_or_float(dat.group(go+7)) tmin = int_or_float(dat.group(go+8)) tmax = int_or_float(dat.group(go+9)) units = dat.group(go+10) self.msgs[ids][1].append( DBCSignal(sgname, start_bit, signal_size, is_little_endian, is_signed, factor, offset, tmin, tmax, units)) for msg in self.msgs.viewvalues(): msg[1].sort(key=lambda x: x.start_bit) def encode(self, msg_id, dd): """Encode a CAN message using the dbc. Inputs: msg_id: The message ID. dd: A dictionary mapping signal name to signal data. """ # TODO: Stop using bitstring, which is super slow. msg_def = self.msgs[msg_id] size = msg_def[0][1] bsf = bitstring.Bits(hex="00"*size) for s in msg_def[1]: ival = dd.get(s.name) if ival is not None: ival = (ival / s.factor) - s.offset ival = int(round(ival)) # should pack this if s.is_little_endian: ss = s.start_bit else: ss = self.bits_index[s.start_bit] if s.is_signed: tbs = bitstring.Bits(int=ival, length=s.size) else: tbs = bitstring.Bits(uint=ival, length=s.size) lpad = bitstring.Bits(bin="0b"+"0"*ss) rpad = bitstring.Bits(bin="0b"+"0"*(8*size-(ss+s.size))) tbs = lpad+tbs+rpad bsf |= tbs return bsf.tobytes() def decode(self, x, arr=None, debug=False): """Decode a CAN message using the dbc. Inputs: x: A collection with elements (address, time, data), where address is the CAN address, time is the bus time, and data is the CAN data as a hex string. arr: Optional list of signals which should be decoded and returned. debug: True to print debugging statements. Returns: A tuple (name, data), where name is the name of the CAN message and data is the decoded result. If arr is None, data is a dict of properties. Otherwise data is a list of the same length as arr. Returns (None, None) if the message could not be decoded. """ if arr is None: out = {} else: out = [None]*len(arr) msg = self.msgs.get(x[0]) if msg is None: if x[0] not in self._warned_addresses: #print("WARNING: Unknown message address {}".format(x[0])) self._warned_addresses.add(x[0]) return None, None name = msg[0][0] if debug: print name blen = 8*len(x[2]) st = x[2].rjust(8, '\x00') le, be = None, None for s in msg[1]: if arr is not None and s[0] not in arr: continue # big or little endian? # see http://vi-firmware.openxcplatform.com/en/master/config/bit-numbering.html if s[3] is False: ss = self.bits_index[s[1]] if be is None: be = struct.unpack(">Q", st)[0] x2_int = be data_bit_pos = (blen - (ss + s[2])) else: if le is None: le = struct.unpack("> data_bit_pos) & ((1 << (s[2])) - 1) if s[4] and (ival & (1<<(s[2]-1))): # signed ival -= (1<