import sys
import time
import struct
from enum import IntEnum
class COMMAND_CODE ( IntEnum ) :
CONNECT = 0xFF
DISCONNECT = 0xFE
GET_STATUS = 0xFD
SYNCH = 0xFC
GET_COMM_MODE_INFO = 0xFB
GET_ID = 0xFA
SET_REQUEST = 0xF9
GET_SEED = 0xF8
UNLOCK = 0xF7
SET_MTA = 0xF6
UPLOAD = 0xF5
SHORT_UPLOAD = 0xF4
BUILD_CHECKSUM = 0xF3
TRANSPORT_LAYER_CMD = 0xF2
USER_CMD = 0xF1
DOWNLOAD = 0xF0
DOWNLOAD_NEXT = 0xEF
DOWNLOAD_MAX = 0xEE
SHORT_DOWNLOAD = 0xED
MODIFY_BITS = 0xEC
SET_CAL_PAGE = 0xEB
GET_CAL_PAGE = 0xEA
GET_PAG_PROCESSOR_INFO = 0xE9
GET_SEGMENT_INFO = 0xE8
GET_PAGE_INFO = 0xE7
SET_SEGMENT_MODE = 0xE6
GET_SEGMENT_MODE = 0xE5
COPY_CAL_PAGE = 0xE4
CLEAR_DAQ_LIST = 0xE3
SET_DAQ_PTR = 0xE2
WRITE_DAQ = 0xE1
SET_DAQ_LIST_MODE = 0xE0
GET_DAQ_LIST_MODE = 0xDF
START_STOP_DAQ_LIST = 0xDE
START_STOP_SYNCH = 0xDD
GET_DAQ_CLOCK = 0xDC
READ_DAQ = 0xDB
GET_DAQ_PROCESSOR_INFO = 0xDA
GET_DAQ_RESOLUTION_INFO = 0xD9
GET_DAQ_LIST_INFO = 0xD8
GET_DAQ_EVENT_INFO = 0xD7
FREE_DAQ = 0xD6
ALLOC_DAQ = 0xD5
ALLOC_ODT = 0xD4
ALLOC_ODT_ENTRY = 0xD3
PROGRAM_START = 0xD2
PROGRAM_CLEAR = 0xD1
PROGRAM = 0xD0
PROGRAM_RESET = 0xCF
GET_PGM_PROCESSOR_INFO = 0xCE
GET_SECTOR_INFO = 0xCD
PROGRAM_PREPARE = 0xCC
PROGRAM_FORMAT = 0xCB
PROGRAM_NEXT = 0xCA
PROGRAM_MAX = 0xC9
PROGRAM_VERIFY = 0xC8
ERROR_CODES = {
0x00 : " Command processor synchronization " ,
0x10 : " Command was not executed " ,
0x11 : " Command rejected because DAQ is running " ,
0x12 : " Command rejected because PGM is running " ,
0x20 : " Unknown command or not implemented optional command " ,
0x21 : " Command syntax invalid " ,
0x22 : " Command syntax valid but command parameter(s) out of range " ,
0x23 : " The memory location is write protected " ,
0x24 : " The memory location is not accessible " ,
0x25 : " Access denied, Seed & Key is required " ,
0x26 : " Selected page not available " ,
0x27 : " Selected page mode not available " ,
0x28 : " Selected segment not valid " ,
0x29 : " Sequence error " ,
0x2A : " DAQ configuration not valid " ,
0x30 : " Memory overflow error " ,
0x31 : " Generic error " ,
0x32 : " The slave internal program verify routine detects an error " ,
}
class CONNECT_MODE ( IntEnum ) :
NORMAL = 0x00 ,
USER_DEFINED = 0x01 ,
class GET_ID_REQUEST_TYPE ( IntEnum ) :
ASCII = 0x00 ,
ASAM_MC2_FILE = 0x01 ,
ASAM_MC2_PATH = 0x02 ,
ASAM_MC2_URL = 0x03 ,
ASAM_MC2_UPLOAD = 0x04 ,
# 128-255 user defined
class CommandTimeoutError ( Exception ) :
pass
class CommandCounterError ( Exception ) :
pass
class CommandResponseError ( Exception ) :
def __init__ ( self , message , return_code ) :
super ( ) . __init__ ( )
self . message = message
self . return_code = return_code
def __str__ ( self ) :
return self . message
class XcpClient :
def __init__ ( self , panda , tx_addr : int , rx_addr : int , bus : int = 0 , timeout : float = 0.1 , debug = False , pad = True ) :
self . tx_addr = tx_addr
self . rx_addr = rx_addr
self . can_bus = bus
self . timeout = timeout
self . debug = debug
self . _panda = panda
self . _byte_order = " > "
self . _max_cto = 8
self . _max_dto = 8
self . pad = pad
def _send_cto ( self , cmd : int , dat : bytes = b " " ) - > None :
tx_data = ( bytes ( [ cmd ] ) + dat )
# Some ECUs don't respond if the packets are not padded to 8 bytes
if self . pad :
tx_data = tx_data . ljust ( 8 , b " \x00 " )
if self . debug :
print ( " CAN-CLEAR: TX " )
self . _panda . can_clear ( self . can_bus )
if self . debug :
print ( " CAN-CLEAR: RX " )
self . _panda . can_clear ( 0xFFFF )
if self . debug :
print ( f " CAN-TX: { hex ( self . tx_addr ) } - 0x { bytes . hex ( tx_data ) } " )
self . _panda . can_send ( self . tx_addr , tx_data , self . can_bus )
def _recv_dto ( self , timeout : float ) - > bytes :
start_time = time . time ( )
while time . time ( ) - start_time < timeout :
msgs = self . _panda . can_recv ( ) or [ ]
if len ( msgs ) > = 256 :
print ( " CAN RX buffer overflow!!! " , file = sys . stderr )
for rx_addr , rx_data , rx_bus in msgs :
if rx_bus == self . can_bus and rx_addr == self . rx_addr :
rx_data = bytes ( rx_data ) # convert bytearray to bytes
if self . debug :
print ( f " CAN-RX: { hex ( rx_addr ) } - 0x { bytes . hex ( rx_data ) } " )
pid = rx_data [ 0 ]
if pid == 0xFE :
err = rx_data [ 1 ]
err_desc = ERROR_CODES . get ( err , " unknown error " )
dat = rx_data [ 2 : ]
raise CommandResponseError ( f " { hex ( err ) } - { err_desc } { dat } " , err )
return bytes ( rx_data [ 1 : ] )
time . sleep ( 0.001 )
raise CommandTimeoutError ( " timeout waiting for response " )
# commands
def connect ( self , connect_mode : CONNECT_MODE = CONNECT_MODE . NORMAL ) - > dict :
self . _send_cto ( COMMAND_CODE . CONNECT , bytes ( [ connect_mode ] ) )
resp = self . _recv_dto ( self . timeout )
assert len ( resp ) == 7 , f " incorrect data length: { len ( resp ) } "
self . _byte_order = " > " if resp [ 1 ] & 0x01 else " < "
self . _slave_block_mode = resp [ 1 ] & 0x40 != 0
self . _max_cto = resp [ 2 ]
self . _max_dto = struct . unpack ( f " { self . _byte_order } H " , resp [ 3 : 5 ] ) [ 0 ]
return {
" cal_support " : resp [ 0 ] & 0x01 != 0 ,
" daq_support " : resp [ 0 ] & 0x04 != 0 ,
" stim_support " : resp [ 0 ] & 0x08 != 0 ,
" pgm_support " : resp [ 0 ] & 0x10 != 0 ,
" byte_order " : self . _byte_order ,
" address_granularity " : 2 * * ( ( resp [ 1 ] & 0x06 ) >> 1 ) ,
" slave_block_mode " : self . _slave_block_mode ,
" optional " : resp [ 1 ] & 0x80 != 0 ,
" max_cto " : self . _max_cto ,
" max_dto " : self . _max_dto ,
" protocol_version " : resp [ 5 ] ,
" transport_version " : resp [ 6 ] ,
}
def disconnect ( self ) - > None :
self . _send_cto ( COMMAND_CODE . DISCONNECT )
resp = self . _recv_dto ( self . timeout )
assert len ( resp ) == 0 , f " incorrect data length: { len ( resp ) } "
def get_id ( self , req_id_type : GET_ID_REQUEST_TYPE = GET_ID_REQUEST_TYPE . ASCII ) - > dict :
if req_id_type > 255 :
raise ValueError ( " request id type must be less than 255 " )
self . _send_cto ( COMMAND_CODE . GET_ID , bytes ( [ req_id_type ] ) )
resp = self . _recv_dto ( self . timeout )
return {
# mode = 0 means MTA was set
# mode = 1 means data is at end (only CAN-FD has space for this)
" mode " : resp [ 0 ] ,
" length " : struct . unpack ( f " { self . _byte_order } I " , resp [ 3 : 7 ] ) [ 0 ] ,
" identifier " : resp [ 7 : ] if self . _max_cto > 8 else None
}
def get_seed ( self , mode : int = 0 ) - > bytes :
if mode > 255 :
raise ValueError ( " mode must be less than 255 " )
self . _send_cto ( COMMAND_CODE . GET_SEED , bytes ( [ 0 , mode ] ) )
# TODO: add support for longer seeds spread over multiple blocks
ret = self . _recv_dto ( self . timeout )
length = ret [ 0 ]
return ret [ 1 : length + 1 ]
def unlock ( self , key : bytes ) - > bytes :
# TODO: add support for longer keys spread over multiple blocks
self . _send_cto ( COMMAND_CODE . UNLOCK , bytes ( [ len ( key ) ] ) + key )
return self . _recv_dto ( self . timeout )
def set_mta ( self , addr : int , addr_ext : int = 0 ) - > bytes :
if addr_ext > 255 :
raise ValueError ( " address extension must be less than 256 " )
# TODO: this looks broken (missing addr extension)
self . _send_cto ( COMMAND_CODE . SET_MTA , bytes ( [ 0x00 , 0x00 , addr_ext ] ) + struct . pack ( f " { self . _byte_order } I " , addr ) )
return self . _recv_dto ( self . timeout )
def upload ( self , size : int ) - > bytes :
if size > 255 :
raise ValueError ( " size must be less than 256 " )
if not self . _slave_block_mode and size > self . _max_dto - 1 :
raise ValueError ( " block mode not supported " )
self . _send_cto ( COMMAND_CODE . UPLOAD , bytes ( [ size ] ) )
resp = b " "
while len ( resp ) < size :
resp + = self . _recv_dto ( self . timeout ) [ : size - len ( resp ) + 1 ]
return resp [ : size ] # trim off bytes with undefined values
def short_upload ( self , size : int , addr_ext : int , addr : int ) - > bytes :
if size > 6 :
raise ValueError ( " size must be less than 7 " )
if addr_ext > 255 :
raise ValueError ( " address extension must be less than 256 " )
self . _send_cto ( COMMAND_CODE . SHORT_UPLOAD , bytes ( [ size , 0x00 , addr_ext ] ) + struct . pack ( f " { self . _byte_order } I " , addr ) )
return self . _recv_dto ( self . timeout ) [ : size ] # trim off bytes with undefined values
def download ( self , data : bytes ) - > bytes :
size = len ( data )
if size > 255 :
raise ValueError ( " size must be less than 256 " )
if not self . _slave_block_mode and size > self . _max_dto - 2 :
raise ValueError ( " block mode not supported " )
self . _send_cto ( COMMAND_CODE . DOWNLOAD , bytes ( [ size ] ) + data )
return self . _recv_dto ( self . timeout ) [ : size ]