import time
from contextlib import AbstractContextManager
from panda import Panda
from opendbc . car . car_helpers import get_car
from opendbc . car . can_definitions import CanData
from opendbc . car . structs import CarParams , CarControl
class PandaRunner ( AbstractContextManager ) :
def __enter__ ( self ) :
self . p = Panda ( )
self . p . reset ( )
# setup + fingerprinting
self . p . set_safety_mode ( CarParams . SafetyModel . elm327 , 1 )
self . CI = get_car ( self . _can_recv , self . p . can_send_many , self . p . set_obd , True , False )
assert self . CI . CP . carFingerprint . lower ( ) != " mock " , " Unable to identify car. Check connections and ensure car is supported. "
safety_model = self . CI . CP . safetyConfigs [ 0 ] . safetyModel
self . p . set_safety_mode ( CarParams . SafetyModel . elm327 , 1 )
self . CI . init ( self . CI . CP , self . _can_recv , self . p . can_send_many )
self . p . set_safety_mode ( safety_model , self . CI . CP . safetyConfigs [ 0 ] . safetyParam )
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . p . set_safety_mode ( CarParams . SafetyModel . noOutput )
self . p . reset ( ) # avoid siren
return super ( ) . __exit__ ( exc_type , exc_value , traceback )
@property
def panda ( self ) - > Panda :
return self . p
def _can_recv ( self , wait_for_one : bool = False ) - > list [ list [ CanData ] ] :
recv = self . p . can_recv ( )
while len ( recv ) == 0 and wait_for_one :
recv = self . p . can_recv ( )
return [ [ CanData ( addr , dat , bus ) for addr , dat , bus in recv ] , ]
def read ( self , strict : bool = True ) :
cs = self . CI . update ( [ int ( time . monotonic ( ) * 1e9 ) , self . _can_recv ( ) [ 0 ] ] )
if strict :
assert cs . canValid , " CAN went invalid, check connections "
return cs
def write ( self , cc : CarControl ) - > None :
if cc . enabled and not self . p . health ( ) [ ' controls_allowed ' ] :
# prevent the car from faulting. print a warning?
cc = CarControl ( enabled = False )
_ , can_sends = self . CI . apply ( cc )
self . p . can_send_many ( can_sends , timeout = 25 )
self . p . send_heartbeat ( )
if __name__ == " __main__ " :
with PandaRunner ( ) as p :
print ( p . read ( ) )