openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
12 KiB

#!/usr/bin/env python
"""Used to Reverse/Test ELM protocol auto detect and OBD message response without a car."""
from __future__ import print_function
import sys
import os
import struct
import binascii
import time
import threading
from collections import deque
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
def lin_checksum(dat):
return sum(dat) % 0x100
class ELMCarSimulator():
def __init__(self, sn, silent=False, can_kbaud=500,
can=True, can11b=True, can29b=True,
lin=True):
self.__p = Panda(sn if sn else Panda.list()[0])
self.__on = True
self.__stop = False
self.__silent = silent
self.__lin_timer = None
self.__lin_active = False
self.__lin_enable = lin
self.__lin_monitor_thread = threading.Thread(target=self.__lin_monitor)
self.__can_multipart_data = None
self.__can_kbaud = can_kbaud
self.__can_extra_noise_msgs = deque()
self.__can_enable = can
self.__can11b = can11b
self.__can29b = can29b
self.__can_monitor_thread = threading.Thread(target=self.__can_monitor)
@property
def panda(self):
return self.__p
def stop(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timeout_handler()
self.__stop = True
def join(self):
if self.__lin_monitor_thread.is_alive():
self.__lin_monitor_thread.join()
if self.__can_monitor_thread.is_alive():
self.__can_monitor_thread.join()
if self.__p:
print("closing handle")
self.__p.close()
def set_enable(self, on):
self.__on = on
def start(self):
self.panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
if self.__lin_enable:
self.__lin_monitor_thread.start()
if self.__can_enable:
self.__can_monitor_thread.start()
#########################
# LIN related functions #
#########################
def __lin_monitor(self):
print("STARTING LIN THREAD")
self.panda.set_uart_baud(2, 10400)
self.panda.kline_drain() # Toss whatever was already there
lin_buff = bytearray()
while not self.__stop:
lin_msg = self.panda.serial_read(2)
if not lin_msg:
continue
lin_buff += lin_msg
#print(" ** Buff", lin_buff)
if lin_buff.endswith(b'\x00\xc1\x33\xf1\x81\x66'): # Leading 0 is wakeup
lin_buff = bytearray()
self.__lin_active = True
print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
self._lin_send(0x10, b'\xC1\x8F\xE9')
self.__reset_lin_timeout()
continue
if self.__lin_active:
msglen = lin_buff[0] & 0x7
if lin_buff[0] & 0xF8 not in (0x80, 0xC0):
print("Invalid bytes at start of message")
print(" BUFF", lin_buff)
continue
if len(lin_buff) < msglen + 4: continue
if lin_checksum(lin_buff[:-1]) != lin_buff[-1]: continue
self.__lin_process_msg(lin_buff[0] & 0xF8, #Priority
lin_buff[1], lin_buff[2], lin_buff[3:-1])
lin_buff = bytearray()
def _lin_send(self, to_addr, msg):
if not self.__silent:
print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
PHYS_ADDR = 0x80
FUNC_ADDR = 0xC0
RECV = 0xF1
SEND = 0x33 # Car OBD Functional Address
headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr)
if not self.__silent:
print(" Sending LIN", binascii.hexlify(headers+msg),
hex(sum(bytearray(headers+msg))%0x100))
self.panda.kline_send(headers + msg)
def __reset_lin_timeout(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timer = threading.Timer(5, self.__lin_timeout_handler)
self.__lin_timer.start()
def __lin_timeout_handler(self):
print("LIN TIMEOUT")
self.__lin_timer = None
self.__lin_active = False
@property
def lin_active(self):
return self.__lin_active
def __lin_process_msg(self, priority, toaddr, fromaddr, data):
self.__reset_lin_timeout()
if not self.__silent and data != b'\x3E':
print("LIN MSG", "Addr:", hex(toaddr), "obdLen:", len(data),
binascii.hexlify(data))
outmsg = None
#if data == b'\x3E':
# print("KEEP ALIVE")
#el
if len(data) > 1:
outmsg = self._process_obd(data[0], data[1])
if outmsg:
obd_header = struct.pack("BB", 0x40 | data[0], data[1])
if len(outmsg) <= 5:
self._lin_send(0x10, obd_header + outmsg)
else:
first_msg_len = min(4, len(outmsg)%4) or 4
self._lin_send(0x10, obd_header + b'\x01' +
b'\x00'*(4-first_msg_len) +
outmsg[:first_msg_len])
for num, i in enumerate(range(first_msg_len, len(outmsg), 4)):
self._lin_send(0x10, obd_header +
struct.pack('B', (num+2)%0x100) + outmsg[i:i+4])
#########################
# CAN related functions #
#########################
def __can_monitor(self):
print("STARTING CAN THREAD")
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
self.panda.can_recv() # Toss whatever was already there
while not self.__stop:
for address, ts, data, src in self.panda.can_recv():
if self.__on and src is 0 and len(data) == 8 and data[0] >= 2:
if not self.__silent:
print("Processing CAN message", src, hex(address), binascii.hexlify(data))
self.__can_process_msg(data[1], data[2], address, ts, data, src)
elif not self.__silent:
print("Rejecting CAN message", src, hex(address), binascii.hexlify(data))
def can_mode_11b(self):
self.__can11b = True
self.__can29b = False
def can_mode_29b(self):
self.__can11b = False
self.__can29b = True
def can_mode_11b_29b(self):
self.__can11b = True
self.__can29b = True
def change_can_baud(self, kbaud):
self.__can_kbaud = kbaud
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
def can_add_extra_noise(self, noise_msg, addr=None):
self.__can_extra_noise_msgs.append((addr, noise_msg))
def _can_send(self, addr, msg):
if not self.__silent:
print(" CAN Reply (%x)" % addr, binascii.hexlify(msg))
self.panda.can_send(addr, msg + b'\x00'*(8-len(msg)), 0)
if self.__can_extra_noise_msgs:
noise = self.__can_extra_noise_msgs.popleft()
self.panda.can_send(noise[0] if noise[0] is not None else addr,
noise[1] + b'\x00'*(8-len(noise[1])), 0)
def _can_addr_matches(self, addr):
if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0):
return True
if self.__can29b and (addr == 0x18db33f1 or (addr & 0x1FFF00FF) == 0x18da00f1):
return True
return False
def __can_process_msg(self, mode, pid, address, ts, data, src):
if not self.__silent:
print("CAN MSG", binascii.hexlify(data[1:1+data[0]]),
"Addr:", hex(address), "Mode:", hex(mode)[2:].zfill(2),
"PID:", hex(pid)[2:].zfill(2), "canLen:", len(data),
binascii.hexlify(data))
if self._can_addr_matches(address) and len(data) == 8:
outmsg = None
if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data):
if not self.__silent:
print("Request for more data");
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
msgnum = 1
while(self.__can_multipart_data):
datalen = min(7, len(self.__can_multipart_data))
msgpiece = struct.pack("B", 0x20 | msgnum) + self.__can_multipart_data[:datalen]
self._can_send(outaddr, msgpiece)
self.__can_multipart_data = self.__can_multipart_data[7:]
msgnum = (msgnum+1)%0x10
time.sleep(0.01)
else:
outmsg = self._process_obd(mode, pid)
if outmsg:
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
if len(outmsg) <= 5:
self._can_send(outaddr,
struct.pack("BBB", len(outmsg)+2, 0x40|data[1], pid) + outmsg)
else:
first_msg_len = min(3, len(outmsg)%7)
payload_len = len(outmsg)+3
msgpiece = struct.pack("BBBBB", 0x10 | ((payload_len>>8)&0xF),
payload_len&0xFF,
0x40|data[1], pid, 1) + outmsg[:first_msg_len]
self._can_send(outaddr, msgpiece)
self.__can_multipart_data = outmsg[first_msg_len:]
#########################
# General OBD functions #
#########################
def _process_obd(self, mode, pid):
if mode == 0x01: # Mode: Show current data
if pid == 0x00: #List supported things
return b"\xff\xff\xff\xfe"#b"\xBE\x1F\xB8\x10" #Bitfield, random features
elif pid == 0x01: # Monitor Status since DTC cleared
return b"\x00\x00\x00\x00" #Bitfield, random features
elif pid == 0x04: # Calculated engine load
return b"\x2f"
elif pid == 0x05: # Engine coolant temperature
return b"\x3c"
elif pid == 0x0B: # Intake manifold absolute pressure
return b"\x90"
elif pid == 0x0C: # Engine RPM
return b"\x1A\xF8"
elif pid == 0x0D: # Vehicle Speed
return b"\x53"
elif pid == 0x10: # MAF air flow rate
return b"\x01\xA0"
elif pid == 0x11: # Throttle Position
return b"\x90"
elif pid == 0x33: # Absolute Barometric Pressure
return b"\x90"
elif mode == 0x09: # Mode: Request vehicle information
if pid == 0x02: # Show VIN
return b"1D4GP00R55B123456"
if pid == 0xFC: # test long multi message. Ligned up for LIN responses
return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num+1) for num in range(80)))
if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
return b'\xAA\xAA\xAA' + b''.join(parts)
if pid == 0xFE: # test very long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF:
return b'\xAA\x00\x00' +\
b"".join(((b'\xAA'*5)+struct.pack(">H", num+1) for num in range(584)))
#return b"\xAA"*100#(0xFFF-3)
if __name__ == "__main__":
serial = os.getenv("SERIAL") if os.getenv("SERIAL") else None
kbaud = int(os.getenv("CANKBAUD")) if os.getenv("CANKBAUD") else 500
bitwidth = int(os.getenv("CANBITWIDTH")) if os.getenv("CANBITWIDTH") else 0
canenable = bool(int(os.getenv("CANENABLE"))) if os.getenv("CANENABLE") else True
linenable = bool(int(os.getenv("LINENABLE"))) if os.getenv("LINENABLE") else True
sim = ELMCarSimulator(serial, can_kbaud=kbaud, can=canenable, lin=linenable)
if(bitwidth == 0):
sim.can_mode_11b_29b()
if(bitwidth == 11):
sim.can_mode_11b()
if(bitwidth == 29):
sim.can_mode_29b()
import signal
def signal_handler(signal, frame):
print('\nShutting down simulator')
sim.stop()
sim.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
sim.start()
signal.pause()