Statsd (#23413)
* device side of statsd
* need to start it
* enable in manager
* add sleep
* cleanup
* remove aggregates for now and standardize on industry terms
* manager needs main
* need to have a try/except
* atomic_write_on_fs_tmp does not work
* cleaner
* use dump
Co-authored-by: Willem Melching <willem.melching@gmail.com>
* one file at a time
* limit amount of files
* move to influx line protocol and cleanup
* needs to be a list
* fix timezone bug
* actually rate limit
* add to release
* normalized origin
* also log deviceType
* more stats
Co-authored-by: Willem Melching <willem.melching@gmail.com>
old-commit-hash: 1b49ce6ec4
commatwo_master
parent
75d00872e8
commit
323febbf37
8 changed files with 201 additions and 3 deletions
@ -0,0 +1,122 @@ |
||||
#!/usr/bin/env python3 |
||||
import os |
||||
import zmq |
||||
import time |
||||
from pathlib import Path |
||||
from datetime import datetime, timezone |
||||
from common.params import Params |
||||
from cereal.messaging import SubMaster |
||||
from selfdrive.swaglog import cloudlog |
||||
from selfdrive.hardware import HARDWARE |
||||
from common.file_helpers import atomic_write_in_dir |
||||
from selfdrive.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty |
||||
from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S |
||||
|
||||
|
||||
class METRIC_TYPE: |
||||
GAUGE = 'g' |
||||
|
||||
class StatLog: |
||||
def __init__(self): |
||||
self.pid = None |
||||
|
||||
def connect(self): |
||||
self.zctx = zmq.Context() |
||||
self.sock = self.zctx.socket(zmq.PUSH) |
||||
self.sock.setsockopt(zmq.LINGER, 10) |
||||
self.sock.connect(STATS_SOCKET) |
||||
self.pid = os.getpid() |
||||
|
||||
def _send(self, metric: str): |
||||
if os.getpid() != self.pid: |
||||
self.connect() |
||||
|
||||
try: |
||||
self.sock.send_string(metric, zmq.NOBLOCK) |
||||
except zmq.error.Again: |
||||
# drop :/ |
||||
pass |
||||
|
||||
def gauge(self, name: str, value: float): |
||||
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}") |
||||
|
||||
|
||||
def main(): |
||||
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict): |
||||
res = f"{measurement}" |
||||
for tag_key in tags.keys(): |
||||
res += f",{tag_key}={str(tags[tag_key])}" |
||||
res += f" value={value} {int(timestamp.timestamp() * 1e9)}\n" |
||||
return res |
||||
|
||||
# open statistics socket |
||||
ctx = zmq.Context().instance() |
||||
sock = ctx.socket(zmq.PULL) |
||||
sock.bind(STATS_SOCKET) |
||||
|
||||
# initialize stats directory |
||||
Path(STATS_DIR).mkdir(parents=True, exist_ok=True) |
||||
|
||||
# initialize tags |
||||
tags = { |
||||
'dongleId': Params().get("DongleId", encoding='utf-8'), |
||||
'started': False, |
||||
'version': get_short_version(), |
||||
'branch': get_short_branch(), |
||||
'dirty': is_dirty(), |
||||
'origin': get_normalized_origin(), |
||||
'deviceType': HARDWARE.get_device_type(), |
||||
} |
||||
|
||||
# subscribe to deviceState for started state |
||||
sm = SubMaster(['deviceState']) |
||||
|
||||
last_flush_time = time.monotonic() |
||||
gauges = {} |
||||
while True: |
||||
try: |
||||
metric = sock.recv_string(zmq.NOBLOCK) |
||||
try: |
||||
metric_type = metric.split('|')[1] |
||||
metric_name = metric.split(':')[0] |
||||
metric_value = metric.split('|')[0].split(':')[1] |
||||
|
||||
if metric_type == METRIC_TYPE.GAUGE: |
||||
gauges[metric_name] = metric_value |
||||
else: |
||||
cloudlog.event("unknown metric type", metric_type=metric_type) |
||||
except Exception: |
||||
cloudlog.event("malformed metric", metric=metric) |
||||
except zmq.error.Again: |
||||
time.sleep(1e-3) |
||||
|
||||
started_prev = sm['deviceState'].started |
||||
sm.update(0) |
||||
|
||||
# flush when started state changes or after FLUSH_TIME_S |
||||
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev): |
||||
result = "" |
||||
current_time = datetime.utcnow().replace(tzinfo=timezone.utc) |
||||
tags['started'] = sm['deviceState'].started |
||||
|
||||
for gauge_key in gauges.keys(): |
||||
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags) |
||||
|
||||
# clear intermediate data |
||||
gauges = {} |
||||
last_flush_time = time.monotonic() |
||||
|
||||
# check that we aren't filling up the drive |
||||
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT: |
||||
if len(result) > 0: |
||||
stats_path = os.path.join(STATS_DIR, str(int(current_time.timestamp()))) |
||||
with atomic_write_in_dir(stats_path) as f: |
||||
f.write(result) |
||||
else: |
||||
cloudlog.error("stats dir full") |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
else: |
||||
statlog = StatLog() |
Loading…
Reference in new issue