openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

94 lines
3.3 KiB

import select
from serial import Serial
from crcmod import mkCrcFun
from struct import pack, unpack_from, calcsize
class ModemDiag:
def __init__(self):
self.serial = self.open_serial()
self.pend = b''
def open_serial(self):
serial = Serial("/dev/ttyUSB0", baudrate=115200, rtscts=True, dsrdtr=True, timeout=0, exclusive=True)
serial.flush()
serial.reset_input_buffer()
serial.reset_output_buffer()
return serial
ccitt_crc16 = mkCrcFun(0x11021, initCrc=0, xorOut=0xffff)
ESCAPE_CHAR = b'\x7d'
TRAILER_CHAR = b'\x7e'
def hdlc_encapsulate(self, payload):
payload += pack('<H', ModemDiag.ccitt_crc16(payload))
payload = payload.replace(self.ESCAPE_CHAR, bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]))
payload = payload.replace(self.TRAILER_CHAR, bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]))
payload += self.TRAILER_CHAR
return payload
def hdlc_decapsulate(self, payload):
assert len(payload) >= 3
assert payload[-1:] == self.TRAILER_CHAR
payload = payload[:-1]
payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]), self.TRAILER_CHAR)
payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]), self.ESCAPE_CHAR)
assert payload[-2:] == pack('<H', ModemDiag.ccitt_crc16(payload[:-2]))
return payload[:-2]
def recv(self):
# self.serial.read_until makes tons of syscalls!
raw_payload = [self.pend]
while self.TRAILER_CHAR not in raw_payload[-1]:
select.select([self.serial.fd], [], [])
raw = self.serial.read(0x10000)
raw_payload.append(raw)
raw_payload = b''.join(raw_payload)
raw_payload, self.pend = raw_payload.split(self.TRAILER_CHAR, 1)
raw_payload += self.TRAILER_CHAR
unframed_message = self.hdlc_decapsulate(raw_payload)
return unframed_message[0], unframed_message[1:]
def send(self, packet_type, packet_payload):
self.serial.write(self.hdlc_encapsulate(bytes([packet_type]) + packet_payload))
# *** end class ***
DIAG_LOG_F = 16
DIAG_LOG_CONFIG_F = 115
LOG_CONFIG_RETRIEVE_ID_RANGES_OP = 1
LOG_CONFIG_SET_MASK_OP = 3
LOG_CONFIG_SUCCESS_S = 0
def send_recv(diag, packet_type, packet_payload):
diag.send(packet_type, packet_payload)
while 1:
opcode, payload = diag.recv()
if opcode != DIAG_LOG_F:
break
return opcode, payload
def setup_logs(diag, types_to_log):
opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xI', LOG_CONFIG_RETRIEVE_ID_RANGES_OP))
header_spec = '<3xII'
operation, status = unpack_from(header_spec, payload)
assert operation == LOG_CONFIG_RETRIEVE_ID_RANGES_OP
assert status == LOG_CONFIG_SUCCESS_S
log_masks = unpack_from('<16I', payload, calcsize(header_spec))
for log_type, log_mask_bitsize in enumerate(log_masks):
if log_mask_bitsize:
log_mask = [0] * ((log_mask_bitsize+7)//8)
for i in range(log_mask_bitsize):
if ((log_type<<12)|i) in types_to_log:
log_mask[i//8] |= 1 << (i%8)
opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xIII',
LOG_CONFIG_SET_MASK_OP,
log_type,
log_mask_bitsize
) + bytes(log_mask))
assert opcode == DIAG_LOG_CONFIG_F
operation, status = unpack_from(header_spec, payload)
assert operation == LOG_CONFIG_SET_MASK_OP
assert status == LOG_CONFIG_SUCCESS_S