Add sample metric type to statsd (#23557)

* add sample stat, and make current power draw work on eon/c2

* fix power monitoring test

* little cleanup

* add type hinting to dict

* save as different values

* duh

* rip out power stat

* small fix

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* cleanup this too

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
pull/24055/head
Robbe Derks 3 years ago committed by GitHub
parent 49498aa7b5
commit d60c44e03c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      selfdrive/statsd.py

@ -3,8 +3,9 @@ import os
import zmq
import time
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timezone
from typing import NoReturn
from typing import NoReturn, Union, List, Dict
from common.params import Params
from cereal.messaging import SubMaster
@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK
class METRIC_TYPE:
GAUGE = 'g'
SAMPLE = 'sa'
class StatLog:
def __init__(self):
@ -42,14 +44,27 @@ class StatLog:
def gauge(self, name: str, value: float) -> None:
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
def main() -> NoReturn:
dongle_id = Params().get("DongleId", encoding='utf-8')
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict) -> str:
def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
res = f"{measurement}"
for k, v in tags.items():
res += f",{k}={str(v)}"
res += f" value={value},dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
res += " "
if isinstance(value, float):
value = {'value': value}
for k, v in value.items():
res += f"{k}={v},"
res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
return res
# open statistics socket
@ -75,6 +90,7 @@ def main() -> NoReturn:
last_flush_time = time.monotonic()
gauges = {}
samples: Dict[str, List[float]] = defaultdict(list)
while True:
started_prev = sm['deviceState'].started
sm.update()
@ -86,10 +102,12 @@ def main() -> NoReturn:
try:
metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0]
metric_value = metric.split('|')[0].split(':')[1]
metric_value = float(metric.split('|')[0].split(':')[1])
if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value
elif metric_type == METRIC_TYPE.SAMPLE:
samples[metric_name].append(metric_value)
else:
cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception:
@ -103,11 +121,29 @@ def main() -> NoReturn:
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
tags['started'] = sm['deviceState'].started
for gauge_key in gauges:
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags)
for key, value in gauges.items():
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
for key, values in samples.items():
values.sort()
sample_count = len(values)
sample_sum = sum(values)
stats = {
'count': sample_count,
'min': values[0],
'max': values[-1],
'mean': sample_sum / sample_count,
}
for percentile in [0.05, 0.5, 0.95]:
value = values[int(round(percentile * (sample_count - 1)))]
stats[f"p{int(percentile * 100)}"] = value
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
# clear intermediate data
gauges = {}
gauges.clear()
samples.clear()
last_flush_time = time.monotonic()
# check that we aren't filling up the drive

Loading…
Cancel
Save