|
|
|
@ -23,6 +23,8 @@ class METRIC_TYPE: |
|
|
|
|
class StatLog: |
|
|
|
|
def __init__(self): |
|
|
|
|
self.pid = None |
|
|
|
|
self.zctx = None |
|
|
|
|
self.sock = None |
|
|
|
|
|
|
|
|
|
def connect(self) -> None: |
|
|
|
|
self.zctx = zmq.Context() |
|
|
|
@ -31,6 +33,12 @@ class StatLog: |
|
|
|
|
self.sock.connect(STATS_SOCKET) |
|
|
|
|
self.pid = os.getpid() |
|
|
|
|
|
|
|
|
|
def __del__(self): |
|
|
|
|
if self.sock is not None: |
|
|
|
|
self.sock.close() |
|
|
|
|
if self.zctx is not None: |
|
|
|
|
self.zctx.term() |
|
|
|
|
|
|
|
|
|
def _send(self, metric: str) -> None: |
|
|
|
|
if os.getpid() != self.pid: |
|
|
|
|
self.connect() |
|
|
|
@ -68,7 +76,7 @@ def main() -> NoReturn: |
|
|
|
|
return res |
|
|
|
|
|
|
|
|
|
# open statistics socket |
|
|
|
|
ctx = zmq.Context().instance() |
|
|
|
|
ctx = zmq.Context.instance() |
|
|
|
|
sock = ctx.socket(zmq.PULL) |
|
|
|
|
sock.bind(STATS_SOCKET) |
|
|
|
|
|
|
|
|
@ -92,70 +100,74 @@ def main() -> NoReturn: |
|
|
|
|
last_flush_time = time.monotonic() |
|
|
|
|
gauges = {} |
|
|
|
|
samples: Dict[str, List[float]] = defaultdict(list) |
|
|
|
|
while True: |
|
|
|
|
started_prev = sm['deviceState'].started |
|
|
|
|
sm.update() |
|
|
|
|
|
|
|
|
|
# Update metrics |
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
try: |
|
|
|
|
metric = sock.recv_string(zmq.NOBLOCK) |
|
|
|
|
started_prev = sm['deviceState'].started |
|
|
|
|
sm.update() |
|
|
|
|
|
|
|
|
|
# Update metrics |
|
|
|
|
while True: |
|
|
|
|
try: |
|
|
|
|
metric_type = metric.split('|')[1] |
|
|
|
|
metric_name = metric.split(':')[0] |
|
|
|
|
metric_value = float(metric.split('|')[0].split(':')[1]) |
|
|
|
|
|
|
|
|
|
if metric_type == METRIC_TYPE.GAUGE: |
|
|
|
|
gauges[metric_name] = metric_value |
|
|
|
|
elif metric_type == METRIC_TYPE.SAMPLE: |
|
|
|
|
samples[metric_name].append(metric_value) |
|
|
|
|
else: |
|
|
|
|
cloudlog.event("unknown metric type", metric_type=metric_type) |
|
|
|
|
except Exception: |
|
|
|
|
cloudlog.event("malformed metric", metric=metric) |
|
|
|
|
except zmq.error.Again: |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
# flush when started state changes or after FLUSH_TIME_S |
|
|
|
|
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev): |
|
|
|
|
result = "" |
|
|
|
|
current_time = datetime.utcnow().replace(tzinfo=timezone.utc) |
|
|
|
|
tags['started'] = sm['deviceState'].started |
|
|
|
|
|
|
|
|
|
for key, value in gauges.items(): |
|
|
|
|
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags) |
|
|
|
|
|
|
|
|
|
for key, values in samples.items(): |
|
|
|
|
values.sort() |
|
|
|
|
sample_count = len(values) |
|
|
|
|
sample_sum = sum(values) |
|
|
|
|
|
|
|
|
|
stats = { |
|
|
|
|
'count': sample_count, |
|
|
|
|
'min': values[0], |
|
|
|
|
'max': values[-1], |
|
|
|
|
'mean': sample_sum / sample_count, |
|
|
|
|
} |
|
|
|
|
for percentile in [0.05, 0.5, 0.95]: |
|
|
|
|
value = values[int(round(percentile * (sample_count - 1)))] |
|
|
|
|
stats[f"p{int(percentile * 100)}"] = value |
|
|
|
|
|
|
|
|
|
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags) |
|
|
|
|
|
|
|
|
|
# clear intermediate data |
|
|
|
|
gauges.clear() |
|
|
|
|
samples.clear() |
|
|
|
|
last_flush_time = time.monotonic() |
|
|
|
|
|
|
|
|
|
# check that we aren't filling up the drive |
|
|
|
|
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT: |
|
|
|
|
if len(result) > 0: |
|
|
|
|
stats_path = os.path.join(STATS_DIR, f"{current_time.timestamp():.0f}_{idx}") |
|
|
|
|
with atomic_write_in_dir(stats_path) as f: |
|
|
|
|
f.write(result) |
|
|
|
|
idx += 1 |
|
|
|
|
else: |
|
|
|
|
cloudlog.error("stats dir full") |
|
|
|
|
metric = sock.recv_string(zmq.NOBLOCK) |
|
|
|
|
try: |
|
|
|
|
metric_type = metric.split('|')[1] |
|
|
|
|
metric_name = metric.split(':')[0] |
|
|
|
|
metric_value = float(metric.split('|')[0].split(':')[1]) |
|
|
|
|
|
|
|
|
|
if metric_type == METRIC_TYPE.GAUGE: |
|
|
|
|
gauges[metric_name] = metric_value |
|
|
|
|
elif metric_type == METRIC_TYPE.SAMPLE: |
|
|
|
|
samples[metric_name].append(metric_value) |
|
|
|
|
else: |
|
|
|
|
cloudlog.event("unknown metric type", metric_type=metric_type) |
|
|
|
|
except Exception: |
|
|
|
|
cloudlog.event("malformed metric", metric=metric) |
|
|
|
|
except zmq.error.Again: |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
# flush when started state changes or after FLUSH_TIME_S |
|
|
|
|
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev): |
|
|
|
|
result = "" |
|
|
|
|
current_time = datetime.utcnow().replace(tzinfo=timezone.utc) |
|
|
|
|
tags['started'] = sm['deviceState'].started |
|
|
|
|
|
|
|
|
|
for key, value in gauges.items(): |
|
|
|
|
result += get_influxdb_line(f"gauge.{key}", value, current_time, tags) |
|
|
|
|
|
|
|
|
|
for key, values in samples.items(): |
|
|
|
|
values.sort() |
|
|
|
|
sample_count = len(values) |
|
|
|
|
sample_sum = sum(values) |
|
|
|
|
|
|
|
|
|
stats = { |
|
|
|
|
'count': sample_count, |
|
|
|
|
'min': values[0], |
|
|
|
|
'max': values[-1], |
|
|
|
|
'mean': sample_sum / sample_count, |
|
|
|
|
} |
|
|
|
|
for percentile in [0.05, 0.5, 0.95]: |
|
|
|
|
value = values[int(round(percentile * (sample_count - 1)))] |
|
|
|
|
stats[f"p{int(percentile * 100)}"] = value |
|
|
|
|
|
|
|
|
|
result += get_influxdb_line(f"sample.{key}", stats, current_time, tags) |
|
|
|
|
|
|
|
|
|
# clear intermediate data |
|
|
|
|
gauges.clear() |
|
|
|
|
samples.clear() |
|
|
|
|
last_flush_time = time.monotonic() |
|
|
|
|
|
|
|
|
|
# check that we aren't filling up the drive |
|
|
|
|
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT: |
|
|
|
|
if len(result) > 0: |
|
|
|
|
stats_path = os.path.join(STATS_DIR, f"{current_time.timestamp():.0f}_{idx}") |
|
|
|
|
with atomic_write_in_dir(stats_path) as f: |
|
|
|
|
f.write(result) |
|
|
|
|
idx += 1 |
|
|
|
|
else: |
|
|
|
|
cloudlog.error("stats dir full") |
|
|
|
|
finally: |
|
|
|
|
sock.close() |
|
|
|
|
ctx.term() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|