|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import sys
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | import datetime
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | from typing import List
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from common.realtime import Ratekeeper
 | 
					
						
							|  |  |  | from common.filter_simple import FirstOrderFilter
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def read_power():
 | 
					
						
							|  |  |  |   with open("/sys/bus/i2c/devices/0-0040/hwmon/hwmon1/power1_input") as f:
 | 
					
						
							|  |  |  |     return int(f.read()) / 1e6
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def sample_power(seconds=5) -> List[float]:
 | 
					
						
							|  |  |  |   rate = 123
 | 
					
						
							|  |  |  |   rk = Ratekeeper(rate, print_delay_threshold=None)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   pwrs = []
 | 
					
						
							|  |  |  |   for _ in range(rate*seconds):
 | 
					
						
							|  |  |  |     pwrs.append(read_power())
 | 
					
						
							|  |  |  |     rk.keep_time()
 | 
					
						
							|  |  |  |   return pwrs
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_power(seconds=5):
 | 
					
						
							|  |  |  |   pwrs = sample_power(seconds)
 | 
					
						
							|  |  |  |   return np.mean(pwrs)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   duration = None
 | 
					
						
							|  |  |  |   if len(sys.argv) > 1:
 | 
					
						
							|  |  |  |     duration = int(sys.argv[1])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   rate = 23
 | 
					
						
							|  |  |  |   rk = Ratekeeper(rate, print_delay_threshold=None)
 | 
					
						
							|  |  |  |   fltr = FirstOrderFilter(0, 5, 1. / rate, initialized=False)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   measurements = []
 | 
					
						
							|  |  |  |   start_time = time.monotonic()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   try:
 | 
					
						
							|  |  |  |     while duration is None or time.monotonic() - start_time < duration:
 | 
					
						
							|  |  |  |       fltr.update(read_power())
 | 
					
						
							|  |  |  |       if rk.frame % rate == 0:
 | 
					
						
							|  |  |  |         measurements.append(fltr.x)
 | 
					
						
							|  |  |  |         t = datetime.timedelta(seconds=time.monotonic() - start_time)
 | 
					
						
							|  |  |  |         avg = sum(measurements) / len(measurements)
 | 
					
						
							|  |  |  |         print(f"Now: {fltr.x:.2f} W, Avg: {avg:.2f} W over {t}")
 | 
					
						
							|  |  |  |       rk.keep_time()
 | 
					
						
							|  |  |  |   except KeyboardInterrupt:
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   t = datetime.timedelta(seconds=time.monotonic() - start_time)
 | 
					
						
							|  |  |  |   avg = sum(measurements) / len(measurements)
 | 
					
						
							|  |  |  |   print(f"\nAverage power: {avg:.2f}W over {t}")
 |