#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  collections  import  defaultdict 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  car 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  common . spinner  import  Spinner 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  common . timeout  import  Timeout 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . boardd . boardd  import  can_list_to_can_capnp 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . car  import  make_can_msg 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  system . hardware  import  TICI 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . test . helpers  import  phone_only ,  with_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  TestBoardd ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    os . environ [ ' STARTED ' ]  =  ' 1 ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ ' BOARDD_LOOPBACK ' ]  =  ' 1 ' 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cls . spinner  =  Spinner ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tearDownClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . spinner . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @phone_only 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @with_processes ( [ ' pandad ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_loopback ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # wait for boardd to init 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  Timeout ( 60 ,  " boardd didn ' t start " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sm  =  messaging . SubMaster ( [ ' pandaStates ' ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      while  sm . rcv_frame [ ' pandaStates ' ]  <  1  and  len ( sm [ ' pandaStates ' ] )  ==  0 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        sm . update ( 1000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    num_pandas  =  len ( sm [ ' pandaStates ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  TICI : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertGreater ( num_pandas ,  1 ,  " connect another panda for multipanda tests " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # boardd blocks on CarVin and CarParams 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cp  =  car . CarParams . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    safety_config  =  car . CarParams . SafetyConfig . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    safety_config . safetyModel  =  car . CarParams . SafetyModel . allOutput 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cp . safetyConfigs  =  [ safety_config ] * num_pandas 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params . put ( " CarVin " ,  b " 0 " * 17 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params . put_bool ( " ControlsReady " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params . put ( " CarParams " ,  cp . to_bytes ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sendcan  =  messaging . pub_sock ( ' sendcan ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    can  =  messaging . sub_sock ( ' can ' ,  conflate = False ,  timeout = 100 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    time . sleep ( 0.2 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    n  =  200 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  i  in  range ( n ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . spinner . update ( f " boardd loopback  { i } / { n } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sent_msgs  =  defaultdict ( set ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  _  in  range ( random . randrange ( 10 ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        to_send  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  __  in  range ( random . randrange ( 100 ) ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          bus  =  random . choice ( [ b  for  b  in  range ( 3 * num_pandas )  if  b  %  4  !=  3 ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          addr  =  random . randrange ( 1 ,  1 << 29 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          dat  =  bytes ( random . getrandbits ( 8 )  for  _  in  range ( random . randrange ( 1 ,  9 ) ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          sent_msgs [ bus ] . add ( ( addr ,  dat ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          to_send . append ( make_can_msg ( addr ,  dat ,  bus ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        sendcan . send ( can_list_to_can_capnp ( to_send ,  msgtype = ' sendcan ' ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      for  _  in  range ( 100  *  2 ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        recvd  =  messaging . drain_sock ( can ,  wait_for_one = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  msg  in  recvd : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          for  m  in  msg . can : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            if  m . src  > =  128 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								              key  =  ( m . address ,  m . dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              assert  key  in  sent_msgs [ m . src - 128 ] ,  f " got unexpected msg:  { m . src =}   { m . address =}   { m . dat =} " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              sent_msgs [ m . src - 128 ] . discard ( key ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  all ( len ( v )  ==  0  for  v  in  sent_msgs . values ( ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          break 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # if a set isn't empty, messages got dropped 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      for  bus  in  sent_msgs . keys ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  not  len ( sent_msgs [ bus ] ) ,  f " loop  { i } : bus  { bus }  missing  { len ( sent_msgs [ bus ] ) }  messages " 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  unittest . main ( )