#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# simple boardd wrapper that updates the panda first 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  usb1 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  subprocess 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  typing  import  List ,  NoReturn 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  functools  import  cmp_to_key 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  panda  import  Panda ,  PandaDFU 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . basedir  import  BASEDIR 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  system . hardware  import  HARDWARE 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  system . swaglog  import  cloudlog 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_expected_signature ( panda :  Panda )  - >  bytes : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  Panda . get_signature_from_firmware ( panda . get_mcu_type ( ) . config . app_path ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . exception ( " Error computing expected signature " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  b " " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  flash_panda ( panda_serial :  str )  - >  Panda : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  panda  =  Panda ( panda_serial ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  fw_signature  =  get_expected_signature ( panda ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  internal_panda  =  panda . is_internal ( )  and  not  panda . bootstub 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  panda_version  =  " bootstub "  if  panda . bootstub  else  panda . get_version ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  panda_signature  =  b " "  if  panda . bootstub  else  panda . get_signature ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cloudlog . warning ( f " Panda  { panda_serial }  connected, version:  { panda_version } , signature  { panda_signature . hex ( ) [ : 16 ] } , expected  { fw_signature . hex ( ) [ : 16 ] } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  panda . bootstub  or  panda_signature  !=  fw_signature : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( " Panda firmware out of date, update required " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    panda . flash ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( " Done flashing " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  panda . bootstub : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bootstub_version  =  panda . get_version ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( f " Flashed firmware not booting, flashing development bootloader. Bootstub version:  { bootstub_version } " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  internal_panda : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      HARDWARE . recover_internal_panda ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    panda . recover ( reset = ( not  internal_panda ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( " Done flashing bootloader " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  panda . bootstub : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( " Panda still not booting, exiting " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  AssertionError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  panda_signature  =  panda . get_signature ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  panda_signature  !=  fw_signature : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . info ( " Version mismatch after flashing, exiting " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  AssertionError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  panda 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  panda_sort_cmp ( a :  Panda ,  b :  Panda ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  a_type  =  a . get_type ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  b_type  =  b . get_type ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # make sure the internal one is always first 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  a . is_internal ( )  and  not  b . is_internal ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  - 1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  not  a . is_internal ( )  and  b . is_internal ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # sort by hardware type 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  a_type  !=  b_type : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  a_type  <  b_type 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # last resort: sort by serial number 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  a . get_usb_serial ( )  <  b . get_usb_serial ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  main ( )  - >  NoReturn : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  first_run  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  True : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      params . remove ( " PandaSignatures " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Flash all Pandas in DFU mode 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      for  serial  in  PandaDFU . list ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . info ( f " Panda in DFU mode found, flashing recovery  { serial } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        PandaDFU ( serial ) . recover ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      panda_serials  =  Panda . list ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  len ( panda_serials )  ==  0 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  first_run : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cloudlog . info ( " Resetting internal panda " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          HARDWARE . reset_internal_panda ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          time . sleep ( 2 )   # wait to come back up 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . info ( f " { len ( panda_serials ) }  panda(s) found, connecting -  { panda_serials } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Flash pandas 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      pandas :  List [ Panda ]  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  serial  in  panda_serials : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pandas . append ( flash_panda ( serial ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # check health for lost heartbeat 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  panda  in  pandas : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        health  =  panda . health ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  health [ " heartbeat_lost " ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          params . put_bool ( " PandaHeartbeatLost " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cloudlog . event ( " heartbeat lost " ,  deviceState = health ,  serial = panda . get_usb_serial ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  first_run : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cloudlog . info ( f " Resetting panda  { panda . get_usb_serial ( ) } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          panda . reset ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # sort pandas to have deterministic order 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pandas . sort ( key = cmp_to_key ( panda_sort_cmp ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      panda_serials  =  list ( map ( lambda  p :  p . get_usb_serial ( ) ,  pandas ) )   # type: ignore 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # log panda fw versions 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      params . put ( " PandaSignatures " ,  b ' , ' . join ( p . get_signature ( )  for  p  in  pandas ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # close all pandas 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  p  in  pandas : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        p . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  ( usb1 . USBErrorNoDevice ,  usb1 . USBErrorPipe ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # a panda was disconnected while setting everything up. let's try again 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " Panda USB exception while setting up " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    first_run  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # run boardd with all connected serials as arguments 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    os . environ [ ' MANAGER_DAEMON ' ]  =  ' boardd ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . chdir ( os . path . join ( BASEDIR ,  " selfdrive/boardd " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    subprocess . run ( [ " ./boardd " ,  * panda_serials ] ,  check = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  main ( )