You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							67 lines
						
					
					
						
							1.8 KiB
						
					
					
				
			
		
		
	
	
							67 lines
						
					
					
						
							1.8 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import datetime
 | 
						|
import numpy as np
 | 
						|
from typing import List
 | 
						|
from collections import deque
 | 
						|
 | 
						|
from openpilot.common.realtime import Ratekeeper
 | 
						|
from openpilot.common.filter_simple import FirstOrderFilter
 | 
						|
 | 
						|
 | 
						|
def read_power():
 | 
						|
  with open("/sys/bus/i2c/devices/0-0040/hwmon/hwmon1/power1_input") as f:
 | 
						|
    return int(f.read()) / 1e6
 | 
						|
 | 
						|
def sample_power(seconds=5) -> List[float]:
 | 
						|
  rate = 123
 | 
						|
  rk = Ratekeeper(rate, print_delay_threshold=None)
 | 
						|
 | 
						|
  pwrs = []
 | 
						|
  for _ in range(rate*seconds):
 | 
						|
    pwrs.append(read_power())
 | 
						|
    rk.keep_time()
 | 
						|
  return pwrs
 | 
						|
 | 
						|
def get_power(seconds=5):
 | 
						|
  pwrs = sample_power(seconds)
 | 
						|
  return np.mean(pwrs)
 | 
						|
 | 
						|
def wait_for_power(min_pwr, max_pwr, min_secs_in_range, timeout):
 | 
						|
  start_time = time.monotonic()
 | 
						|
  pwrs = deque([min_pwr - 1.]*min_secs_in_range, maxlen=min_secs_in_range)
 | 
						|
  while (time.monotonic() - start_time < timeout):
 | 
						|
    pwrs.append(get_power(1))
 | 
						|
    if all(min_pwr <= p <= max_pwr for p in pwrs):
 | 
						|
      break
 | 
						|
  return np.mean(pwrs)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  duration = None
 | 
						|
  if len(sys.argv) > 1:
 | 
						|
    duration = int(sys.argv[1])
 | 
						|
 | 
						|
  rate = 23
 | 
						|
  rk = Ratekeeper(rate, print_delay_threshold=None)
 | 
						|
  fltr = FirstOrderFilter(0, 5, 1. / rate, initialized=False)
 | 
						|
 | 
						|
  measurements = []
 | 
						|
  start_time = time.monotonic()
 | 
						|
 | 
						|
  try:
 | 
						|
    while duration is None or time.monotonic() - start_time < duration:
 | 
						|
      fltr.update(read_power())
 | 
						|
      if rk.frame % rate == 0:
 | 
						|
        measurements.append(fltr.x)
 | 
						|
        t = datetime.timedelta(seconds=time.monotonic() - start_time)
 | 
						|
        avg = sum(measurements) / len(measurements)
 | 
						|
        print(f"Now: {fltr.x:.2f} W, Avg: {avg:.2f} W over {t}")
 | 
						|
      rk.keep_time()
 | 
						|
  except KeyboardInterrupt:
 | 
						|
    pass
 | 
						|
 | 
						|
  t = datetime.timedelta(seconds=time.monotonic() - start_time)
 | 
						|
  avg = sum(measurements) / len(measurements)
 | 
						|
  print(f"\nAverage power: {avg:.2f}W over {t}")
 | 
						|
 |