# python library to interface with panda
from __future__ import print_function
import binascii
import struct
import hashlib
import socket
import usb1
import os
import time
import traceback
from dfu import PandaDFU
from esptool import ESPROM , CesantaFlasher
from flash_release import flash_release
from update import ensure_st_up_to_date
from serial import PandaSerial
from isotp import isotp_send , isotp_recv
__version__ = ' 0.0.8 '
BASEDIR = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) , " ../ " )
DEBUG = os . getenv ( " PANDADEBUG " ) is not None
# *** wifi mode ***
def build_st ( target , mkfile = " Makefile " ) :
from panda import BASEDIR
assert ( os . system ( ' cd %s && make -f %s clean && make -f %s %s >/dev/null ' % ( os . path . join ( BASEDIR , " board " ) , mkfile , mkfile , target ) ) == 0 )
def parse_can_buffer ( dat ) :
ret = [ ]
for j in range ( 0 , len ( dat ) , 0x10 ) :
ddat = dat [ j : j + 0x10 ]
f1 , f2 = struct . unpack ( " II " , ddat [ 0 : 8 ] )
extended = 4
if f1 & extended :
address = f1 >> 3
else :
address = f1 >> 21
dddat = ddat [ 8 : 8 + ( f2 & 0xF ) ]
if DEBUG :
print ( " R %x : %s " % ( address , str ( dddat ) . encode ( " hex " ) ) )
ret . append ( ( address , f2 >> 16 , dddat , ( f2 >> 4 ) & 0xFF ) )
return ret
class PandaWifiStreaming ( object ) :
def __init__ ( self , ip = " 192.168.0.10 " , port = 1338 ) :
self . sock = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
self . sock . setblocking ( 0 )
self . ip = ip
self . port = port
self . kick ( )
def kick ( self ) :
# must be called at least every 5 seconds
self . sock . sendto ( " hello " , ( self . ip , self . port ) )
def can_recv ( self ) :
ret = [ ]
while True :
try :
dat , addr = self . sock . recvfrom ( 0x200 * 0x10 )
if addr == ( self . ip , self . port ) :
ret + = parse_can_buffer ( dat )
except socket . error as e :
if e . errno != 35 and e . errno != 11 :
traceback . print_exc ( )
break
return ret
# stupid tunneling of USB over wifi and SPI
class WifiHandle ( object ) :
def __init__ ( self , ip = " 192.168.0.10 " , port = 1337 ) :
self . sock = socket . create_connection ( ( ip , port ) )
def __recv ( self ) :
ret = self . sock . recv ( 0x44 )
length = struct . unpack ( " I " , ret [ 0 : 4 ] ) [ 0 ]
return ret [ 4 : 4 + length ]
def controlWrite ( self , request_type , request , value , index , data , timeout = 0 ) :
# ignore data in reply, panda doesn't use it
return self . controlRead ( request_type , request , value , index , 0 , timeout )
def controlRead ( self , request_type , request , value , index , length , timeout = 0 ) :
self . sock . send ( struct . pack ( " HHBBHHH " , 0 , 0 , request_type , request , value , index , length ) )
return self . __recv ( )
def bulkWrite ( self , endpoint , data , timeout = 0 ) :
if len ( data ) > 0x10 :
raise ValueError ( " Data must not be longer than 0x10 " )
self . sock . send ( struct . pack ( " HH " , endpoint , len ( data ) ) + data )
self . __recv ( ) # to /dev/null
def bulkRead ( self , endpoint , length , timeout = 0 ) :
self . sock . send ( struct . pack ( " HH " , endpoint , 0 ) )
return self . __recv ( )
def close ( self ) :
self . sock . close ( )
# *** normal mode ***
class Panda ( object ) :
SAFETY_NOOUTPUT = 0
SAFETY_HONDA = 1
SAFETY_TOYOTA = 2
SAFETY_GM = 3
SAFETY_HONDA_BOSCH = 4
SAFETY_FORD = 5
SAFETY_CADILLAC = 6
SAFETY_HYUNDAI = 7
SAFETY_TESLA = 8
SAFETY_CHRYSLER = 9
SAFETY_TOYOTA_IPAS = 0x1335
SAFETY_TOYOTA_NOLIMITS = 0x1336
SAFETY_ALLOUTPUT = 0x1337
SAFETY_ELM327 = 0xE327
SERIAL_DEBUG = 0
SERIAL_ESP = 1
SERIAL_LIN1 = 2
SERIAL_LIN2 = 3
GMLAN_CAN2 = 1
GMLAN_CAN3 = 2
REQUEST_IN = usb1 . ENDPOINT_IN | usb1 . TYPE_VENDOR | usb1 . RECIPIENT_DEVICE
REQUEST_OUT = usb1 . ENDPOINT_OUT | usb1 . TYPE_VENDOR | usb1 . RECIPIENT_DEVICE
def __init__ ( self , serial = None , claim = True ) :
self . _serial = serial
self . _handle = None
self . connect ( claim )
def close ( self ) :
self . _handle . close ( )
self . _handle = None
def connect ( self , claim = True , wait = False ) :
if self . _handle != None :
self . close ( )
if self . _serial == " WIFI " :
self . _handle = WifiHandle ( )
print ( " opening WIFI device " )
self . wifi = True
else :
context = usb1 . USBContext ( )
self . _handle = None
self . wifi = False
while 1 :
try :
for device in context . getDeviceList ( skip_on_error = True ) :
#print(device)
if device . getVendorID ( ) == 0xbbaa and device . getProductID ( ) in [ 0xddcc , 0xddee ] :
try :
this_serial = device . getSerialNumber ( )
except Exception :
continue
if self . _serial is None or this_serial == self . _serial :
self . _serial = this_serial
print ( " opening device " , self . _serial , hex ( device . getProductID ( ) ) )
time . sleep ( 1 )
self . bootstub = device . getProductID ( ) == 0xddee
self . legacy = ( device . getbcdDevice ( ) != 0x2300 )
self . _handle = device . open ( )
if claim :
self . _handle . claimInterface ( 0 )
#self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack
break
except Exception as e :
print ( " exception " , e )
traceback . print_exc ( )
if wait == False or self . _handle != None :
break
assert ( self . _handle != None )
print ( " connected " )
def reset ( self , enter_bootstub = False , enter_bootloader = False ) :
# reset
try :
if enter_bootloader :
self . _handle . controlWrite ( Panda . REQUEST_IN , 0xd1 , 0 , 0 , b ' ' )
else :
if enter_bootstub :
self . _handle . controlWrite ( Panda . REQUEST_IN , 0xd1 , 1 , 0 , b ' ' )
else :
self . _handle . controlWrite ( Panda . REQUEST_IN , 0xd8 , 0 , 0 , b ' ' )
except Exception :
pass
if not enter_bootloader :
self . reconnect ( )
def reconnect ( self ) :
self . close ( )
time . sleep ( 1.0 )
success = False
# wait up to 15 seconds
for i in range ( 0 , 15 ) :
try :
self . connect ( )
success = True
break
except Exception :
print ( " reconnecting is taking %d seconds... " % ( i + 1 ) )
try :
dfu = PandaDFU ( PandaDFU . st_serial_to_dfu_serial ( self . _serial ) )
dfu . recover ( )
except Exception :
pass
time . sleep ( 1.0 )
if not success :
raise Exception ( " reconnect failed " )
@staticmethod
def flash_static ( handle , code ) :
# confirm flasher is present
fr = handle . controlRead ( Panda . REQUEST_IN , 0xb0 , 0 , 0 , 0xc )
assert fr [ 4 : 8 ] == " \xde \xad \xd0 \x0d "
# unlock flash
print ( " flash: unlocking " )
handle . controlWrite ( Panda . REQUEST_IN , 0xb1 , 0 , 0 , b ' ' )
# erase sectors 1 and 2
print ( " flash: erasing " )
handle . controlWrite ( Panda . REQUEST_IN , 0xb2 , 1 , 0 , b ' ' )
handle . controlWrite ( Panda . REQUEST_IN , 0xb2 , 2 , 0 , b ' ' )
# flash over EP2
STEP = 0x10
print ( " flash: flashing " )
for i in range ( 0 , len ( code ) , STEP ) :
handle . bulkWrite ( 2 , code [ i : i + STEP ] )
# reset
print ( " flash: resetting " )
try :
handle . controlWrite ( Panda . REQUEST_IN , 0xd8 , 0 , 0 , b ' ' )
except Exception :
pass
def flash ( self , fn = None , code = None , reconnect = True ) :
if not self . bootstub :
self . reset ( enter_bootstub = True )
assert ( self . bootstub )
if fn is None and code is None :
if self . legacy :
fn = " obj/comma.bin "
print ( " building legacy st code " )
build_st ( fn , " Makefile.legacy " )
else :
fn = " obj/panda.bin "
print ( " building panda st code " )
build_st ( fn )
fn = os . path . join ( BASEDIR , " board " , fn )
if code is None :
with open ( fn ) as f :
code = f . read ( )
# get version
print ( " flash: version is " + self . get_version ( ) )
# do flash
Panda . flash_static ( self . _handle , code )
# reconnect
if reconnect :
self . reconnect ( )
def recover ( self ) :
self . reset ( enter_bootloader = True )
while len ( PandaDFU . list ( ) ) == 0 :
print ( " waiting for DFU... " )
time . sleep ( 0.1 )
dfu = PandaDFU ( PandaDFU . st_serial_to_dfu_serial ( self . _serial ) )
dfu . recover ( )
# reflash after recover
self . connect ( True , True )
self . flash ( )
@staticmethod
def flash_ota_st ( ) :
ret = os . system ( " cd %s && make clean && make ota " % ( os . path . join ( BASEDIR , " board " ) ) )
time . sleep ( 1 )
return ret == 0
@staticmethod
def flash_ota_wifi ( ) :
ret = os . system ( " cd %s && make clean && make ota " % ( os . path . join ( BASEDIR , " boardesp " ) ) )
time . sleep ( 1 )
return ret == 0
@staticmethod
def list ( ) :
context = usb1 . USBContext ( )
ret = [ ]
try :
for device in context . getDeviceList ( skip_on_error = True ) :
if device . getVendorID ( ) == 0xbbaa and device . getProductID ( ) in [ 0xddcc , 0xddee ] :
try :
ret . append ( device . getSerialNumber ( ) )
except Exception :
continue
except Exception :
pass
# TODO: detect if this is real
#ret += ["WIFI"]
return ret
def call_control_api ( self , msg ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , msg , 0 , 0 , b ' ' )
# ******************* health *******************
def health ( self ) :
dat = self . _handle . controlRead ( Panda . REQUEST_IN , 0xd2 , 0 , 0 , 13 )
a = struct . unpack ( " IIBBBBB " , dat )
return { " voltage " : a [ 0 ] , " current " : a [ 1 ] ,
" started " : a [ 2 ] , " controls_allowed " : a [ 3 ] ,
" gas_interceptor_detected " : a [ 4 ] ,
" started_signal_detected " : a [ 5 ] ,
" started_alt " : a [ 6 ] }
# ******************* control *******************
def enter_bootloader ( self ) :
try :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xd1 , 0 , 0 , b ' ' )
except Exception as e :
print ( e )
pass
def get_version ( self ) :
return self . _handle . controlRead ( Panda . REQUEST_IN , 0xd6 , 0 , 0 , 0x40 )
def is_grey ( self ) :
ret = self . _handle . controlRead ( Panda . REQUEST_IN , 0xc1 , 0 , 0 , 0x40 )
return ret == " \x01 "
def get_serial ( self ) :
dat = self . _handle . controlRead ( Panda . REQUEST_IN , 0xd0 , 0 , 0 , 0x20 )
hashsig , calc_hash = dat [ 0x1c : ] , hashlib . sha1 ( dat [ 0 : 0x1c ] ) . digest ( ) [ 0 : 4 ]
assert ( hashsig == calc_hash )
return [ dat [ 0 : 0x10 ] , dat [ 0x10 : 0x10 + 10 ] ]
def get_secret ( self ) :
return self . _handle . controlRead ( Panda . REQUEST_IN , 0xd0 , 1 , 0 , 0x10 )
# ******************* configuration *******************
def set_usb_power ( self , on ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xe6 , int ( on ) , 0 , b ' ' )
def set_esp_power ( self , on ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xd9 , int ( on ) , 0 , b ' ' )
def esp_reset ( self , bootmode = 0 ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xda , int ( bootmode ) , 0 , b ' ' )
time . sleep ( 0.2 )
def set_safety_mode ( self , mode = SAFETY_NOOUTPUT ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xdc , mode , 0 , b ' ' )
def set_can_forwarding ( self , from_bus , to_bus ) :
# TODO: This feature may not work correctly with saturated buses
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xdd , from_bus , to_bus , b ' ' )
def set_gmlan ( self , bus = 2 ) :
if bus is None :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xdb , 0 , 0 , b ' ' )
elif bus in [ Panda . GMLAN_CAN2 , Panda . GMLAN_CAN3 ] :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xdb , 1 , bus , b ' ' )
def set_can_loopback ( self , enable ) :
# set can loopback mode for all buses
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xe5 , int ( enable ) , 0 , b ' ' )
def set_can_speed_kbps ( self , bus , speed ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xde , bus , int ( speed * 10 ) , b ' ' )
def set_uart_baud ( self , uart , rate ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xe4 , uart , rate / 300 , b ' ' )
def set_uart_parity ( self , uart , parity ) :
# parity, 0=off, 1=even, 2=odd
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xe2 , uart , parity , b ' ' )
def set_uart_callback ( self , uart , install ) :
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xe3 , uart , int ( install ) , b ' ' )
# ******************* can *******************
def can_send_many ( self , arr ) :
snds = [ ]
transmit = 1
extended = 4
for addr , _ , dat , bus in arr :
assert len ( dat ) < = 8
if DEBUG :
print ( " W %x : %s " % ( addr , dat . encode ( " hex " ) ) )
if addr > = 0x800 :
rir = ( addr << 3 ) | transmit | extended
else :
rir = ( addr << 21 ) | transmit
snd = struct . pack ( " II " , rir , len ( dat ) | ( bus << 4 ) ) + dat
snd = snd . ljust ( 0x10 , b ' \x00 ' )
snds . append ( snd )
while True :
try :
#print("DAT: %s"%b''.join(snds).__repr__())
if self . wifi :
for s in snds :
self . _handle . bulkWrite ( 3 , s )
else :
self . _handle . bulkWrite ( 3 , b ' ' . join ( snds ) )
break
except ( usb1 . USBErrorIO , usb1 . USBErrorOverflow ) :
print ( " CAN: BAD SEND MANY, RETRYING " )
def can_send ( self , addr , dat , bus ) :
self . can_send_many ( [ [ addr , None , dat , bus ] ] )
def can_recv ( self ) :
dat = bytearray ( )
while True :
try :
dat = self . _handle . bulkRead ( 1 , 0x10 * 256 )
break
except ( usb1 . USBErrorIO , usb1 . USBErrorOverflow ) :
print ( " CAN: BAD RECV, RETRYING " )
return parse_can_buffer ( dat )
def can_clear ( self , bus ) :
""" Clears all messages from the specified internal CAN ringbuffer as
though it were drained .
Args :
bus ( int ) : can bus number to clear a tx queue , or 0xFFFF to clear the
global can rx queue .
"""
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xf1 , bus , 0 , b ' ' )
# ******************* isotp *******************
def isotp_send ( self , addr , dat , bus , recvaddr = None , subaddr = None ) :
return isotp_send ( self , dat , addr , bus , recvaddr , subaddr )
def isotp_recv ( self , addr , bus = 0 , sendaddr = None , subaddr = None ) :
return isotp_recv ( self , addr , bus , sendaddr , subaddr )
# ******************* serial *******************
def serial_read ( self , port_number ) :
ret = [ ]
while 1 :
lret = bytes ( self . _handle . controlRead ( Panda . REQUEST_IN , 0xe0 , port_number , 0 , 0x40 ) )
if len ( lret ) == 0 :
break
ret . append ( lret )
return b ' ' . join ( ret )
def serial_write ( self , port_number , ln ) :
ret = 0
for i in range ( 0 , len ( ln ) , 0x20 ) :
ret + = self . _handle . bulkWrite ( 2 , struct . pack ( " B " , port_number ) + ln [ i : i + 0x20 ] )
return ret
def serial_clear ( self , port_number ) :
""" Clears all messages (tx and rx) from the specified internal uart
ringbuffer as though it were drained .
Args :
port_number ( int ) : port number of the uart to clear .
"""
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xf2 , port_number , 0 , b ' ' )
# ******************* kline *******************
# pulse low for wakeup
def kline_wakeup ( self ) :
if DEBUG :
print ( " kline wakeup... " )
self . _handle . controlWrite ( Panda . REQUEST_OUT , 0xf0 , 0 , 0 , b ' ' )
if DEBUG :
print ( " kline wakeup done " )
def kline_drain ( self , bus = 2 ) :
# drain buffer
bret = bytearray ( )
while True :
ret = self . _handle . controlRead ( Panda . REQUEST_IN , 0xe0 , bus , 0 , 0x40 )
if len ( ret ) == 0 :
break
elif DEBUG :
print ( " kline drain: " + str ( ret ) . encode ( " hex " ) )
bret + = ret
return bytes ( bret )
def kline_ll_recv ( self , cnt , bus = 2 ) :
echo = bytearray ( )
while len ( echo ) != cnt :
ret = str ( self . _handle . controlRead ( Panda . REQUEST_OUT , 0xe0 , bus , 0 , cnt - len ( echo ) ) )
if DEBUG and len ( ret ) > 0 :
print ( " kline recv: " + ret . encode ( " hex " ) )
echo + = ret
return str ( echo )
def kline_send ( self , x , bus = 2 , checksum = True ) :
def get_checksum ( dat ) :
result = 0
result + = sum ( map ( ord , dat ) ) if isinstance ( b ' dat ' , str ) else sum ( dat )
result = - result
return struct . pack ( " B " , result % 0x100 )
self . kline_drain ( bus = bus )
if checksum :
x + = get_checksum ( x )
for i in range ( 0 , len ( x ) , 0xf ) :
ts = x [ i : i + 0xf ]
if DEBUG :
print ( " kline send: " + ts . encode ( " hex " ) )
self . _handle . bulkWrite ( 2 , chr ( bus ) . encode ( ) + ts )
echo = self . kline_ll_recv ( len ( ts ) , bus = bus )
if echo != ts :
print ( " **** ECHO ERROR %d **** " % i )
print ( binascii . hexlify ( echo ) )
print ( binascii . hexlify ( ts ) )
assert echo == ts
def kline_recv ( self , bus = 2 ) :
msg = self . kline_ll_recv ( 2 , bus = bus )
msg + = self . kline_ll_recv ( ord ( msg [ 1 ] ) - 2 , bus = bus )
return msg