diff --git a/selfdrive/statsd.py b/selfdrive/statsd.py index 2e62e32536..b880c4110a 100755 --- a/selfdrive/statsd.py +++ b/selfdrive/statsd.py @@ -3,8 +3,9 @@ import os import zmq import time from pathlib import Path +from collections import defaultdict from datetime import datetime, timezone -from typing import NoReturn +from typing import NoReturn, Union, List, Dict from common.params import Params from cereal.messaging import SubMaster @@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK class METRIC_TYPE: GAUGE = 'g' + SAMPLE = 'sa' class StatLog: def __init__(self): @@ -42,14 +44,27 @@ class StatLog: def gauge(self, name: str, value: float) -> None: self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}") + # Samples will be recorded in a buffer and at aggregation time, + # statistical properties will be logged (mean, count, percentiles, ...) + def sample(self, name: str, value: float): + self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}") + def main() -> NoReturn: dongle_id = Params().get("DongleId", encoding='utf-8') - def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict) -> str: + def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str: res = f"{measurement}" for k, v in tags.items(): res += f",{k}={str(v)}" - res += f" value={value},dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n" + res += " " + + if isinstance(value, float): + value = {'value': value} + + for k, v in value.items(): + res += f"{k}={v}," + + res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n" return res # open statistics socket @@ -75,6 +90,7 @@ def main() -> NoReturn: last_flush_time = time.monotonic() gauges = {} + samples: Dict[str, List[float]] = defaultdict(list) while True: started_prev = sm['deviceState'].started sm.update() @@ -86,10 +102,12 @@ def main() -> NoReturn: try: metric_type = metric.split('|')[1] metric_name = metric.split(':')[0] - metric_value = metric.split('|')[0].split(':')[1] + metric_value = float(metric.split('|')[0].split(':')[1]) if metric_type == METRIC_TYPE.GAUGE: gauges[metric_name] = metric_value + elif metric_type == METRIC_TYPE.SAMPLE: + samples[metric_name].append(metric_value) else: cloudlog.event("unknown metric type", metric_type=metric_type) except Exception: @@ -103,11 +121,29 @@ def main() -> NoReturn: current_time = datetime.utcnow().replace(tzinfo=timezone.utc) tags['started'] = sm['deviceState'].started - for gauge_key in gauges: - result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags) + for key, value in gauges.items(): + result += get_influxdb_line(f"gauge.{key}", value, current_time, tags) + + for key, values in samples.items(): + values.sort() + sample_count = len(values) + sample_sum = sum(values) + + stats = { + 'count': sample_count, + 'min': values[0], + 'max': values[-1], + 'mean': sample_sum / sample_count, + } + for percentile in [0.05, 0.5, 0.95]: + value = values[int(round(percentile * (sample_count - 1)))] + stats[f"p{int(percentile * 100)}"] = value + + result += get_influxdb_line(f"sample.{key}", stats, current_time, tags) # clear intermediate data - gauges = {} + gauges.clear() + samples.clear() last_flush_time = time.monotonic() # check that we aren't filling up the drive