Add sample metric type to statsd (#23557)

* add sample stat, and make current power draw work on eon/c2

* fix power monitoring test

* little cleanup

* add type hinting to dict

* save as different values

* duh

* rip out power stat

* small fix

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* cleanup this too

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: d60c44e03c
taco
Robbe Derks 3 years ago committed by GitHub
parent c319dfd924
commit af38179430
  1. 50
      selfdrive/statsd.py

@ -3,8 +3,9 @@ import os
import zmq import zmq
import time import time
from pathlib import Path from pathlib import Path
from collections import defaultdict
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import NoReturn from typing import NoReturn, Union, List, Dict
from common.params import Params from common.params import Params
from cereal.messaging import SubMaster from cereal.messaging import SubMaster
@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK
class METRIC_TYPE: class METRIC_TYPE:
GAUGE = 'g' GAUGE = 'g'
SAMPLE = 'sa'
class StatLog: class StatLog:
def __init__(self): def __init__(self):
@ -42,14 +44,27 @@ class StatLog:
def gauge(self, name: str, value: float) -> None: def gauge(self, name: str, value: float) -> None:
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}") self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
def main() -> NoReturn: def main() -> NoReturn:
dongle_id = Params().get("DongleId", encoding='utf-8') dongle_id = Params().get("DongleId", encoding='utf-8')
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict) -> str: def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
res = f"{measurement}" res = f"{measurement}"
for k, v in tags.items(): for k, v in tags.items():
res += f",{k}={str(v)}" res += f",{k}={str(v)}"
res += f" value={value},dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n" res += " "
if isinstance(value, float):
value = {'value': value}
for k, v in value.items():
res += f"{k}={v},"
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
return res return res
# open statistics socket # open statistics socket
@ -75,6 +90,7 @@ def main() -> NoReturn:
last_flush_time = time.monotonic() last_flush_time = time.monotonic()
gauges = {} gauges = {}
samples: Dict[str, List[float]] = defaultdict(list)
while True: while True:
started_prev = sm['deviceState'].started started_prev = sm['deviceState'].started
sm.update() sm.update()
@ -86,10 +102,12 @@ def main() -> NoReturn:
try: try:
metric_type = metric.split('|')[1] metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0] metric_name = metric.split(':')[0]
metric_value = metric.split('|')[0].split(':')[1] metric_value = float(metric.split('|')[0].split(':')[1])
if metric_type == METRIC_TYPE.GAUGE: if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value gauges[metric_name] = metric_value
elif metric_type == METRIC_TYPE.SAMPLE:
samples[metric_name].append(metric_value)
else: else:
cloudlog.event("unknown metric type", metric_type=metric_type) cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception: except Exception:
@ -103,11 +121,29 @@ def main() -> NoReturn:
current_time = datetime.utcnow().replace(tzinfo=timezone.utc) current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
tags['started'] = sm['deviceState'].started tags['started'] = sm['deviceState'].started
for gauge_key in gauges: for key, value in gauges.items():
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags) result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
for key, values in samples.items():
values.sort()
sample_count = len(values)
sample_sum = sum(values)
stats = {
'count': sample_count,
'min': values[0],
'max': values[-1],
'mean': sample_sum / sample_count,
}
for percentile in [0.05, 0.5, 0.95]:
value = values[int(round(percentile * (sample_count - 1)))]
stats[f"p{int(percentile * 100)}"] = value
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
# clear intermediate data # clear intermediate data
gauges = {} gauges.clear()
samples.clear()
last_flush_time = time.monotonic() last_flush_time = time.monotonic()
# check that we aren't filling up the drive # check that we aren't filling up the drive

Loading…
Cancel
Save