import  datetime 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  statistics  import  mean 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . swaglog  import  cloudlog 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								PANDA_OUTPUT_VOLTAGE  =  5.28 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# Parameters 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_battery_capacity ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/battery/capacity " ,  int ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_battery_status ( ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # This does not correspond with actual charging or not. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/battery/status " ,  lambda  x :  x . strip ( ) ,  ' ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_battery_current ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/battery/current_now " ,  int ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_battery_voltage ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/battery/voltage_now " ,  int ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_usb_present ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/usb/present " ,  lambda  x :  bool ( int ( x ) ) ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_battery_charging ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # This does correspond with actually charging 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  _read_param ( " /sys/class/power_supply/battery/charge_type " ,  lambda  x :  x . strip ( )  !=  " N/A " ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  set_battery_charging ( on ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  with  open ( ' /sys/class/power_supply/battery/charging_enabled ' ,  ' w ' )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    f . write ( f " { 1  if  on  else  0 } \n " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# Helpers 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  _read_param ( path ,  parser ,  default = 0 ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  open ( path )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  parser ( f . read ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  default 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  panda_current_to_actual_current ( panda_current ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # From white/grey panda schematic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  ( 3.3  -  ( panda_current  *  3.3  /  4096 ) )  /  8.25 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  PowerMonitoring : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . last_measurement_time  =  None            # Used for integration delta 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . power_used_uWh  =  0                      # Integrated power usage in uWh since going into offroad 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . next_pulsed_measurement_time  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . integration_lock  =  threading . Lock ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # Calculation tick 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  calculate ( self ,  health ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      now  =  time . time ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Check that time is valid 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  datetime . datetime . fromtimestamp ( now ) . year  <  2019 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Only integrate when there is no ignition 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # If health is None, we're probably not in a car, so we don't care 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  health  is  None  or  ( health . health . ignitionLine  or  health . health . ignitionCan )  or  \
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								         health . health . hwType  ==  log . HealthData . HwType . unknown : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        with  self . integration_lock : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . last_measurement_time  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . next_pulsed_measurement_time  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . power_used_uWh  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # First measurement, set integration time 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  self . integration_lock : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . last_measurement_time  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . last_measurement_time  =  now 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      is_uno  =  health . health . hwType  ==  log . HealthData . HwType . uno 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Get current power draw somehow 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      current_power  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  get_battery_status ( )  ==  ' Discharging ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # If the battery is discharging, we can use this measurement 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # On C2: this is low by about 10-15%, probably mostly due to UNO draw not being factored in 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        current_power  =  ( ( get_battery_voltage ( )  /  1000000 )  *  ( get_battery_current ( )  /  1000000 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  ( health . health . hwType  in  [ log . HealthData . HwType . whitePanda ,  log . HealthData . HwType . greyPanda ] )  and  ( health . health . current  >  1 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # If white/grey panda, use the integrated current measurements if the measurement is not 0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # If the measurement is 0, the current is 400mA or greater, and out of the measurement range of the panda 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # This seems to be accurate to about 5% 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        current_power  =  ( PANDA_OUTPUT_VOLTAGE  *  panda_current_to_actual_current ( health . health . current ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      elif  ( self . next_pulsed_measurement_time  is  not  None )  and  ( self . next_pulsed_measurement_time  < =  now ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # TODO: Figure out why this is off by a factor of 3/4??? 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        FUDGE_FACTOR  =  1.33 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Turn off charging for about 10 sec in a thread that does not get killed on SIGINT, and perform measurement here to avoid blocking thermal 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        def  perform_pulse_measurement ( now ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            set_battery_charging ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            # Measure for a few sec to get a good average 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            voltages  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            currents  =  [ ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								            for  _  in  range ( 6 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              voltages . append ( get_battery_voltage ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              currents . append ( get_battery_current ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              time . sleep ( 1 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								            current_power  =  ( ( mean ( voltages )  /  1000000 )  *  ( mean ( currents )  /  1000000 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            self . _perform_integration ( now ,  current_power  *  FUDGE_FACTOR ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            # Enable charging again 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            set_battery_charging ( True ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cloudlog . exception ( " Pulsed power measurement failed " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Start pulsed measurement and return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        threading . Thread ( target = perform_pulse_measurement ,  args = ( now , ) ) . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . next_pulsed_measurement_time  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      elif  self . next_pulsed_measurement_time  is  None  and  not  is_uno : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # On a charging EON with black panda, or drawing more than 400mA out of a white/grey one 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Only way to get the power draw is to turn off charging for a few sec and check what the discharging rate is 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # We shouldn't do this very often, so make sure it has been some long-ish random time interval 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . next_pulsed_measurement_time  =  now  +  random . randint ( 120 ,  180 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Do nothing 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Do the integration 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . _perform_integration ( now ,  current_power ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      cloudlog . exception ( " Power monitoring calculation failed " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _perform_integration ( self ,  t ,  current_power ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    with  self . integration_lock : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . last_measurement_time : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          integration_time_h  =  ( t  -  self . last_measurement_time )  /  3600 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          power_used  =  ( current_power  *  1000000 )  *  integration_time_h 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  power_used  <  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            raise  ValueError ( f " Negative power used! Integration time:  { integration_time_h }  h Current Power:  { power_used }  uWh " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . power_used_uWh  + =  power_used 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          self . last_measurement_time  =  t 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . exception ( " Integration failed " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # Get the power usage 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  get_power_used ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  int ( self . power_used_uWh )