You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					326 lines
				
				12 KiB
			
		
		
			
		
	
	
					326 lines
				
				12 KiB
			| 
								 
											6 years ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								"""Used to Reverse/Test ELM protocol auto detect and OBD message response without a car."""
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								import sys
							 | 
						||
| 
								 | 
							
								import os
							 | 
						||
| 
								 | 
							
								import struct
							 | 
						||
| 
								 | 
							
								import binascii
							 | 
						||
| 
								 | 
							
								import time
							 | 
						||
| 
								 | 
							
								import threading
							 | 
						||
| 
								 | 
							
								from collections import deque
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
							 | 
						||
| 
								 | 
							
								from panda import Panda
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def lin_checksum(dat):
							 | 
						||
| 
								 | 
							
								    return sum(dat) % 0x100
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class ELMCarSimulator():
							 | 
						||
| 
								 | 
							
								    def __init__(self, sn, silent=False, can_kbaud=500,
							 | 
						||
| 
								 | 
							
								                 can=True, can11b=True, can29b=True,
							 | 
						||
| 
								 | 
							
								                 lin=True):
							 | 
						||
| 
								 | 
							
								        self.__p = Panda(sn if sn else Panda.list()[0])
							 | 
						||
| 
								 | 
							
								        self.__on = True
							 | 
						||
| 
								 | 
							
								        self.__stop = False
							 | 
						||
| 
								 | 
							
								        self.__silent = silent
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.__lin_timer = None
							 | 
						||
| 
								 | 
							
								        self.__lin_active = False
							 | 
						||
| 
								 | 
							
								        self.__lin_enable = lin
							 | 
						||
| 
								 | 
							
								        self.__lin_monitor_thread = threading.Thread(target=self.__lin_monitor)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.__can_multipart_data = None
							 | 
						||
| 
								 | 
							
								        self.__can_kbaud = can_kbaud
							 | 
						||
| 
								 | 
							
								        self.__can_extra_noise_msgs = deque()
							 | 
						||
| 
								 | 
							
								        self.__can_enable = can
							 | 
						||
| 
								 | 
							
								        self.__can11b = can11b
							 | 
						||
| 
								 | 
							
								        self.__can29b = can29b
							 | 
						||
| 
								 | 
							
								        self.__can_monitor_thread = threading.Thread(target=self.__can_monitor)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @property
							 | 
						||
| 
								 | 
							
								    def panda(self):
							 | 
						||
| 
								 | 
							
								        return self.__p
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def stop(self):
							 | 
						||
| 
								 | 
							
								        if self.__lin_timer:
							 | 
						||
| 
								 | 
							
								            self.__lin_timer.cancel()
							 | 
						||
| 
								 | 
							
								            self.__lin_timeout_handler()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.__stop = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def join(self):
							 | 
						||
| 
								 | 
							
								        if self.__lin_monitor_thread.is_alive():
							 | 
						||
| 
								 | 
							
								            self.__lin_monitor_thread.join()
							 | 
						||
| 
								 | 
							
								        if self.__can_monitor_thread.is_alive():
							 | 
						||
| 
								 | 
							
								            self.__can_monitor_thread.join()
							 | 
						||
| 
								 | 
							
								        if self.__p:
							 | 
						||
| 
								 | 
							
								            print("closing handle")
							 | 
						||
| 
								 | 
							
								            self.__p.close()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def set_enable(self, on):
							 | 
						||
| 
								 | 
							
								        self.__on = on
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def start(self):
							 | 
						||
| 
								 | 
							
								        self.panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
							 | 
						||
| 
								 | 
							
								        if self.__lin_enable:
							 | 
						||
| 
								 | 
							
								            self.__lin_monitor_thread.start()
							 | 
						||
| 
								 | 
							
								        if self.__can_enable:
							 | 
						||
| 
								 | 
							
								            self.__can_monitor_thread.start()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								    # LIN related functions #
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __lin_monitor(self):
							 | 
						||
| 
								 | 
							
								        print("STARTING LIN THREAD")
							 | 
						||
| 
								 | 
							
								        self.panda.set_uart_baud(2, 10400)
							 | 
						||
| 
								 | 
							
								        self.panda.kline_drain() # Toss whatever was already there
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        lin_buff = bytearray()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        while not self.__stop:
							 | 
						||
| 
								 | 
							
								            lin_msg = self.panda.serial_read(2)
							 | 
						||
| 
								 | 
							
								            if not lin_msg:
							 | 
						||
| 
								 | 
							
								                continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            lin_buff += lin_msg
							 | 
						||
| 
								 | 
							
								            #print("    ** Buff", lin_buff)
							 | 
						||
| 
								 | 
							
								            if lin_buff.endswith(b'\x00\xc1\x33\xf1\x81\x66'): # Leading 0 is wakeup
							 | 
						||
| 
								 | 
							
								                lin_buff = bytearray()
							 | 
						||
| 
								 | 
							
								                self.__lin_active = True
							 | 
						||
| 
								 | 
							
								                print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
							 | 
						||
| 
								 | 
							
								                self._lin_send(0x10, b'\xC1\x8F\xE9')
							 | 
						||
| 
								 | 
							
								                self.__reset_lin_timeout()
							 | 
						||
| 
								 | 
							
								                continue
							 | 
						||
| 
								 | 
							
								            if self.__lin_active:
							 | 
						||
| 
								 | 
							
								                msglen = lin_buff[0] & 0x7
							 | 
						||
| 
								 | 
							
								                if lin_buff[0] & 0xF8 not in (0x80, 0xC0):
							 | 
						||
| 
								 | 
							
								                    print("Invalid bytes at start of message")
							 | 
						||
| 
								 | 
							
								                    print("    BUFF", lin_buff)
							 | 
						||
| 
								 | 
							
								                    continue
							 | 
						||
| 
								 | 
							
								                if len(lin_buff) < msglen + 4: continue
							 | 
						||
| 
								 | 
							
								                if lin_checksum(lin_buff[:-1]) != lin_buff[-1]: continue
							 | 
						||
| 
								 | 
							
								                self.__lin_process_msg(lin_buff[0] & 0xF8, #Priority
							 | 
						||
| 
								 | 
							
								                                       lin_buff[1], lin_buff[2], lin_buff[3:-1])
							 | 
						||
| 
								 | 
							
								                lin_buff = bytearray()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _lin_send(self, to_addr, msg):
							 | 
						||
| 
								 | 
							
								        if not self.__silent:
							 | 
						||
| 
								 | 
							
								            print("    LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        PHYS_ADDR = 0x80
							 | 
						||
| 
								 | 
							
								        FUNC_ADDR = 0xC0
							 | 
						||
| 
								 | 
							
								        RECV = 0xF1
							 | 
						||
| 
								 | 
							
								        SEND = 0x33 # Car OBD Functional Address
							 | 
						||
| 
								 | 
							
								        headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr)
							 | 
						||
| 
								 | 
							
								        if not self.__silent:
							 | 
						||
| 
								 | 
							
								            print("    Sending LIN", binascii.hexlify(headers+msg),
							 | 
						||
| 
								 | 
							
								                  hex(sum(bytearray(headers+msg))%0x100))
							 | 
						||
| 
								 | 
							
								        self.panda.kline_send(headers + msg)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __reset_lin_timeout(self):
							 | 
						||
| 
								 | 
							
								        if self.__lin_timer:
							 | 
						||
| 
								 | 
							
								            self.__lin_timer.cancel()
							 | 
						||
| 
								 | 
							
								        self.__lin_timer = threading.Timer(5, self.__lin_timeout_handler)
							 | 
						||
| 
								 | 
							
								        self.__lin_timer.start()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __lin_timeout_handler(self):
							 | 
						||
| 
								 | 
							
								        print("LIN TIMEOUT")
							 | 
						||
| 
								 | 
							
								        self.__lin_timer = None
							 | 
						||
| 
								 | 
							
								        self.__lin_active = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @property
							 | 
						||
| 
								 | 
							
								    def lin_active(self):
							 | 
						||
| 
								 | 
							
								        return self.__lin_active
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __lin_process_msg(self, priority, toaddr, fromaddr, data):
							 | 
						||
| 
								 | 
							
								        self.__reset_lin_timeout()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if not self.__silent and data != b'\x3E':
							 | 
						||
| 
								 | 
							
								            print("LIN MSG", "Addr:", hex(toaddr), "obdLen:", len(data),
							 | 
						||
| 
								 | 
							
								                  binascii.hexlify(data))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        outmsg = None
							 | 
						||
| 
								 | 
							
								        #if data == b'\x3E':
							 | 
						||
| 
								 | 
							
								        #    print("KEEP ALIVE")
							 | 
						||
| 
								 | 
							
								        #el
							 | 
						||
| 
								 | 
							
								        if len(data) > 1:
							 | 
						||
| 
								 | 
							
								            outmsg = self._process_obd(data[0], data[1])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if outmsg:
							 | 
						||
| 
								 | 
							
								            obd_header = struct.pack("BB", 0x40 | data[0], data[1])
							 | 
						||
| 
								 | 
							
								            if len(outmsg) <= 5:
							 | 
						||
| 
								 | 
							
								                self._lin_send(0x10, obd_header + outmsg)
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                first_msg_len = min(4, len(outmsg)%4) or 4
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                self._lin_send(0x10, obd_header + b'\x01' +
							 | 
						||
| 
								 | 
							
								                               b'\x00'*(4-first_msg_len) +
							 | 
						||
| 
								 | 
							
								                               outmsg[:first_msg_len])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                for num, i in enumerate(range(first_msg_len, len(outmsg), 4)):
							 | 
						||
| 
								 | 
							
								                    self._lin_send(0x10, obd_header +
							 | 
						||
| 
								 | 
							
								                                   struct.pack('B', (num+2)%0x100) + outmsg[i:i+4])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								    # CAN related functions #
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __can_monitor(self):
							 | 
						||
| 
								 | 
							
								        print("STARTING CAN THREAD")
							 | 
						||
| 
								 | 
							
								        self.panda.set_can_speed_kbps(0, self.__can_kbaud)
							 | 
						||
| 
								 | 
							
								        self.panda.can_recv() # Toss whatever was already there
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        while not self.__stop:
							 | 
						||
| 
								 | 
							
								            for address, ts, data, src in self.panda.can_recv():
							 | 
						||
| 
								 | 
							
								                if self.__on and src is 0 and len(data) == 8 and data[0] >= 2:
							 | 
						||
| 
								 | 
							
								                    if not self.__silent:
							 | 
						||
| 
								 | 
							
								                        print("Processing CAN message", src, hex(address), binascii.hexlify(data))
							 | 
						||
| 
								 | 
							
								                    self.__can_process_msg(data[1], data[2], address, ts, data, src)
							 | 
						||
| 
								 | 
							
								                elif not self.__silent:
							 | 
						||
| 
								 | 
							
								                    print("Rejecting CAN message", src, hex(address), binascii.hexlify(data))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def can_mode_11b(self):
							 | 
						||
| 
								 | 
							
								        self.__can11b = True
							 | 
						||
| 
								 | 
							
								        self.__can29b = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def can_mode_29b(self):
							 | 
						||
| 
								 | 
							
								        self.__can11b = False
							 | 
						||
| 
								 | 
							
								        self.__can29b = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def can_mode_11b_29b(self):
							 | 
						||
| 
								 | 
							
								        self.__can11b = True
							 | 
						||
| 
								 | 
							
								        self.__can29b = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def change_can_baud(self, kbaud):
							 | 
						||
| 
								 | 
							
								        self.__can_kbaud = kbaud
							 | 
						||
| 
								 | 
							
								        self.panda.set_can_speed_kbps(0, self.__can_kbaud)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def can_add_extra_noise(self, noise_msg, addr=None):
							 | 
						||
| 
								 | 
							
								        self.__can_extra_noise_msgs.append((addr, noise_msg))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _can_send(self, addr, msg):
							 | 
						||
| 
								 | 
							
								        if not self.__silent:
							 | 
						||
| 
								 | 
							
								            print("    CAN Reply (%x)" % addr, binascii.hexlify(msg))
							 | 
						||
| 
								 | 
							
								        self.panda.can_send(addr, msg + b'\x00'*(8-len(msg)), 0)
							 | 
						||
| 
								 | 
							
								        if self.__can_extra_noise_msgs:
							 | 
						||
| 
								 | 
							
								            noise = self.__can_extra_noise_msgs.popleft()
							 | 
						||
| 
								 | 
							
								            self.panda.can_send(noise[0] if noise[0] is not None else addr,
							 | 
						||
| 
								 | 
							
								                             noise[1] + b'\x00'*(8-len(noise[1])), 0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _can_addr_matches(self, addr):
							 | 
						||
| 
								 | 
							
								        if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0):
							 | 
						||
| 
								 | 
							
								            return True
							 | 
						||
| 
								 | 
							
								        if self.__can29b and (addr == 0x18db33f1 or (addr & 0x1FFF00FF) == 0x18da00f1):
							 | 
						||
| 
								 | 
							
								            return True
							 | 
						||
| 
								 | 
							
								        return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __can_process_msg(self, mode, pid, address, ts, data, src):
							 | 
						||
| 
								 | 
							
								        if not self.__silent:
							 | 
						||
| 
								 | 
							
								            print("CAN MSG", binascii.hexlify(data[1:1+data[0]]),
							 | 
						||
| 
								 | 
							
								                  "Addr:", hex(address), "Mode:", hex(mode)[2:].zfill(2),
							 | 
						||
| 
								 | 
							
								                  "PID:", hex(pid)[2:].zfill(2), "canLen:", len(data),
							 | 
						||
| 
								 | 
							
								                  binascii.hexlify(data))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if self._can_addr_matches(address) and len(data) == 8:
							 | 
						||
| 
								 | 
							
								            outmsg = None
							 | 
						||
| 
								 | 
							
								            if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data):
							 | 
						||
| 
								 | 
							
								                if not self.__silent:
							 | 
						||
| 
								 | 
							
								                    print("Request for more data");
							 | 
						||
| 
								 | 
							
								                outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
							 | 
						||
| 
								 | 
							
								                msgnum = 1
							 | 
						||
| 
								 | 
							
								                while(self.__can_multipart_data):
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                    datalen = min(7, len(self.__can_multipart_data))
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                    msgpiece = struct.pack("B", 0x20 | msgnum) + self.__can_multipart_data[:datalen]
							 | 
						||
| 
								 | 
							
								                    self._can_send(outaddr, msgpiece)
							 | 
						||
| 
								 | 
							
								                    self.__can_multipart_data = self.__can_multipart_data[7:]
							 | 
						||
| 
								 | 
							
								                    msgnum = (msgnum+1)%0x10
							 | 
						||
| 
								 | 
							
								                    time.sleep(0.01)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                outmsg = self._process_obd(mode, pid)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if outmsg:
							 | 
						||
| 
								 | 
							
								                outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                if len(outmsg) <= 5:
							 | 
						||
| 
								 | 
							
								                    self._can_send(outaddr,
							 | 
						||
| 
								 | 
							
								                                   struct.pack("BBB", len(outmsg)+2, 0x40|data[1], pid) + outmsg)
							 | 
						||
| 
								 | 
							
								                else:
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                    first_msg_len = min(3, len(outmsg)%7)
							 | 
						||
| 
								 
											6 years ago
										 
									 | 
							
								                    payload_len = len(outmsg)+3
							 | 
						||
| 
								 | 
							
								                    msgpiece = struct.pack("BBBBB", 0x10 | ((payload_len>>8)&0xF),
							 | 
						||
| 
								 | 
							
								                                           payload_len&0xFF,
							 | 
						||
| 
								 | 
							
								                                           0x40|data[1], pid, 1) + outmsg[:first_msg_len]
							 | 
						||
| 
								 | 
							
								                    self._can_send(outaddr, msgpiece)
							 | 
						||
| 
								 | 
							
								                    self.__can_multipart_data = outmsg[first_msg_len:]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								    # General OBD functions #
							 | 
						||
| 
								 | 
							
								    #########################
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _process_obd(self, mode, pid):
							 | 
						||
| 
								 | 
							
								        if mode == 0x01: # Mode: Show current data
							 | 
						||
| 
								 | 
							
								            if pid == 0x00:   #List supported things
							 | 
						||
| 
								 | 
							
								                return b"\xff\xff\xff\xfe"#b"\xBE\x1F\xB8\x10" #Bitfield, random features
							 | 
						||
| 
								 | 
							
								            elif pid == 0x01: # Monitor Status since DTC cleared
							 | 
						||
| 
								 | 
							
								                return b"\x00\x00\x00\x00" #Bitfield, random features
							 | 
						||
| 
								 | 
							
								            elif pid == 0x04: # Calculated engine load
							 | 
						||
| 
								 | 
							
								                return b"\x2f"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x05: # Engine coolant temperature
							 | 
						||
| 
								 | 
							
								                return b"\x3c"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x0B: # Intake manifold absolute pressure
							 | 
						||
| 
								 | 
							
								                return b"\x90"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x0C: # Engine RPM
							 | 
						||
| 
								 | 
							
								                return b"\x1A\xF8"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x0D: # Vehicle Speed
							 | 
						||
| 
								 | 
							
								                return b"\x53"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x10: # MAF air flow rate
							 | 
						||
| 
								 | 
							
								                return b"\x01\xA0"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x11: # Throttle Position
							 | 
						||
| 
								 | 
							
								                return b"\x90"
							 | 
						||
| 
								 | 
							
								            elif pid == 0x33: # Absolute Barometric Pressure
							 | 
						||
| 
								 | 
							
								                return b"\x90"
							 | 
						||
| 
								 | 
							
								        elif mode == 0x09: # Mode: Request vehicle information
							 | 
						||
| 
								 | 
							
								            if pid == 0x02:   # Show VIN
							 | 
						||
| 
								 | 
							
								                return b"1D4GP00R55B123456"
							 | 
						||
| 
								 | 
							
								            if pid == 0xFC:   # test long multi message. Ligned up for LIN responses
							 | 
						||
| 
								 | 
							
								                return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num+1) for num in range(80)))
							 | 
						||
| 
								 | 
							
								            if pid == 0xFD:   # test long multi message
							 | 
						||
| 
								 | 
							
								                parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
							 | 
						||
| 
								 | 
							
								                return b'\xAA\xAA\xAA' + b''.join(parts)
							 | 
						||
| 
								 | 
							
								            if pid == 0xFE:   # test very long multi message
							 | 
						||
| 
								 | 
							
								                parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
							 | 
						||
| 
								 | 
							
								                return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
							 | 
						||
| 
								 | 
							
								            if pid == 0xFF:
							 | 
						||
| 
								 | 
							
								                return b'\xAA\x00\x00' +\
							 | 
						||
| 
								 | 
							
								                        b"".join(((b'\xAA'*5)+struct.pack(">H", num+1) for num in range(584)))
							 | 
						||
| 
								 | 
							
								                #return b"\xAA"*100#(0xFFF-3)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == "__main__":
							 | 
						||
| 
								 | 
							
								    serial = os.getenv("SERIAL") if os.getenv("SERIAL") else None
							 | 
						||
| 
								 | 
							
								    kbaud = int(os.getenv("CANKBAUD")) if os.getenv("CANKBAUD") else 500
							 | 
						||
| 
								 | 
							
								    bitwidth = int(os.getenv("CANBITWIDTH")) if os.getenv("CANBITWIDTH") else 0
							 | 
						||
| 
								 | 
							
								    canenable = bool(int(os.getenv("CANENABLE"))) if os.getenv("CANENABLE") else True
							 | 
						||
| 
								 | 
							
								    linenable = bool(int(os.getenv("LINENABLE"))) if os.getenv("LINENABLE") else True
							 | 
						||
| 
								 | 
							
								    sim = ELMCarSimulator(serial, can_kbaud=kbaud, can=canenable, lin=linenable)
							 | 
						||
| 
								 | 
							
								    if(bitwidth == 0):
							 | 
						||
| 
								 | 
							
								        sim.can_mode_11b_29b()
							 | 
						||
| 
								 | 
							
								    if(bitwidth == 11):
							 | 
						||
| 
								 | 
							
								        sim.can_mode_11b()
							 | 
						||
| 
								 | 
							
								    if(bitwidth == 29):
							 | 
						||
| 
								 | 
							
								        sim.can_mode_29b()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    import signal
							 | 
						||
| 
								 | 
							
								    def signal_handler(signal, frame):
							 | 
						||
| 
								 | 
							
								        print('\nShutting down simulator')
							 | 
						||
| 
								 | 
							
								        sim.stop()
							 | 
						||
| 
								 | 
							
								        sim.join()
							 | 
						||
| 
								 | 
							
								        sys.exit(0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    signal.signal(signal.SIGINT, signal_handler)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    sim.start()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    signal.pause()
							 |