#!/usr/bin/env python3
import sys
import time
import datetime
import numpy as np
from typing import List
from collections import deque
from openpilot . common . realtime import Ratekeeper
from openpilot . common . filter_simple import FirstOrderFilter
def read_power ( ) :
with open ( " /sys/bus/i2c/devices/0-0040/hwmon/hwmon1/power1_input " ) as f :
return int ( f . read ( ) ) / 1e6
def sample_power ( seconds = 5 ) - > List [ float ] :
rate = 123
rk = Ratekeeper ( rate , print_delay_threshold = None )
pwrs = [ ]
for _ in range ( rate * seconds ) :
pwrs . append ( read_power ( ) )
rk . keep_time ( )
return pwrs
def get_power ( seconds = 5 ) :
pwrs = sample_power ( seconds )
return np . mean ( pwrs )
def wait_for_power ( min_pwr , max_pwr , min_secs_in_range , timeout ) :
start_time = time . monotonic ( )
pwrs = deque ( [ min_pwr - 1. ] * min_secs_in_range , maxlen = min_secs_in_range )
while ( time . monotonic ( ) - start_time < timeout ) :
pwrs . append ( get_power ( 1 ) )
if all ( min_pwr < = p < = max_pwr for p in pwrs ) :
break
return np . mean ( pwrs )
if __name__ == " __main__ " :
duration = None
if len ( sys . argv ) > 1 :
duration = int ( sys . argv [ 1 ] )
rate = 23
rk = Ratekeeper ( rate , print_delay_threshold = None )
fltr = FirstOrderFilter ( 0 , 5 , 1. / rate , initialized = False )
measurements = [ ]
start_time = time . monotonic ( )
try :
while duration is None or time . monotonic ( ) - start_time < duration :
fltr . update ( read_power ( ) )
if rk . frame % rate == 0 :
measurements . append ( fltr . x )
t = datetime . timedelta ( seconds = time . monotonic ( ) - start_time )
avg = sum ( measurements ) / len ( measurements )
print ( f " Now: { fltr . x : .2f } W, Avg: { avg : .2f } W over { t } " )
rk . keep_time ( )
except KeyboardInterrupt :
pass
t = datetime . timedelta ( seconds = time . monotonic ( ) - start_time )
avg = sum ( measurements ) / len ( measurements )
print ( f " \n Average power: { avg : .2f } W over { t } " )