#!/usr/bin/env python3
import os
import zmq
import time
from pathlib import Path
from collections import defaultdict
from datetime import datetime , timezone
from typing import NoReturn , Union , List , Dict
from common . params import Params
from cereal . messaging import SubMaster
from selfdrive . swaglog import cloudlog
from selfdrive . hardware import HARDWARE
from common . file_helpers import atomic_write_in_dir
from selfdrive . version import get_normalized_origin , get_short_branch , get_short_version , is_dirty
from selfdrive . loggerd . config import STATS_DIR , STATS_DIR_FILE_LIMIT , STATS_SOCKET , STATS_FLUSH_TIME_S
class METRIC_TYPE :
GAUGE = ' g '
SAMPLE = ' sa '
class StatLog :
def __init__ ( self ) :
self . pid = None
def connect ( self ) - > None :
self . zctx = zmq . Context ( )
self . sock = self . zctx . socket ( zmq . PUSH )
self . sock . setsockopt ( zmq . LINGER , 10 )
self . sock . connect ( STATS_SOCKET )
self . pid = os . getpid ( )
def _send ( self , metric : str ) - > None :
if os . getpid ( ) != self . pid :
self . connect ( )
try :
self . sock . send_string ( metric , zmq . NOBLOCK )
except zmq . error . Again :
# drop :/
pass
def gauge ( self , name : str , value : float ) - > None :
self . _send ( f " { name } : { value } | { METRIC_TYPE . GAUGE } " )
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample ( self , name : str , value : float ) :
self . _send ( f " { name } : { value } | { METRIC_TYPE . SAMPLE } " )
def main ( ) - > NoReturn :
dongle_id = Params ( ) . get ( " DongleId " , encoding = ' utf-8 ' )
def get_influxdb_line ( measurement : str , value : Union [ float , Dict [ str , float ] ] , timestamp : datetime , tags : dict ) - > str :
res = f " { measurement } "
for k , v in tags . items ( ) :
res + = f " , { k } = { str ( v ) } "
res + = " "
if isinstance ( value , float ) :
value = { ' value ' : value }
for k , v in value . items ( ) :
res + = f " { k } = { v } , "
res + = f " dongle_id= \" { dongle_id } \" { int ( timestamp . timestamp ( ) * 1e9 ) } \n "
return res
# open statistics socket
ctx = zmq . Context ( ) . instance ( )
sock = ctx . socket ( zmq . PULL )
sock . bind ( STATS_SOCKET )
# initialize stats directory
Path ( STATS_DIR ) . mkdir ( parents = True , exist_ok = True )
# initialize tags
tags = {
' started ' : False ,
' version ' : get_short_version ( ) ,
' branch ' : get_short_branch ( ) ,
' dirty ' : is_dirty ( ) ,
' origin ' : get_normalized_origin ( ) ,
' deviceType ' : HARDWARE . get_device_type ( ) ,
}
# subscribe to deviceState for started state
sm = SubMaster ( [ ' deviceState ' ] )
last_flush_time = time . monotonic ( )
gauges = { }
samples : Dict [ str , List [ float ] ] = defaultdict ( list )
while True :
started_prev = sm [ ' deviceState ' ] . started
sm . update ( )
# Update metrics
while True :
try :
metric = sock . recv_string ( zmq . NOBLOCK )
try :
metric_type = metric . split ( ' | ' ) [ 1 ]
metric_name = metric . split ( ' : ' ) [ 0 ]
metric_value = float ( metric . split ( ' | ' ) [ 0 ] . split ( ' : ' ) [ 1 ] )
if metric_type == METRIC_TYPE . GAUGE :
gauges [ metric_name ] = metric_value
elif metric_type == METRIC_TYPE . SAMPLE :
samples [ metric_name ] . append ( metric_value )
else :
cloudlog . event ( " unknown metric type " , metric_type = metric_type )
except Exception :
cloudlog . event ( " malformed metric " , metric = metric )
except zmq . error . Again :
break
# flush when started state changes or after FLUSH_TIME_S
if ( time . monotonic ( ) > last_flush_time + STATS_FLUSH_TIME_S ) or ( sm [ ' deviceState ' ] . started != started_prev ) :
result = " "
current_time = datetime . utcnow ( ) . replace ( tzinfo = timezone . utc )
tags [ ' started ' ] = sm [ ' deviceState ' ] . started
for key , value in gauges . items ( ) :
result + = get_influxdb_line ( f " gauge. { key } " , value , current_time , tags )
for key , values in samples . items ( ) :
values . sort ( )
sample_count = len ( values )
sample_sum = sum ( values )
stats = {
' count ' : sample_count ,
' min ' : values [ 0 ] ,
' max ' : values [ - 1 ] ,
' mean ' : sample_sum / sample_count ,
}
for percentile in [ 0.05 , 0.5 , 0.95 ] :
value = values [ int ( round ( percentile * ( sample_count - 1 ) ) ) ]
stats [ f " p { int ( percentile * 100 ) } " ] = value
result + = get_influxdb_line ( f " sample. { key } " , stats , current_time , tags )
# clear intermediate data
gauges . clear ( )
samples . clear ( )
last_flush_time = time . monotonic ( )
# check that we aren't filling up the drive
if len ( os . listdir ( STATS_DIR ) ) < STATS_DIR_FILE_LIMIT :
if len ( result ) > 0 :
stats_path = os . path . join ( STATS_DIR , str ( int ( current_time . timestamp ( ) ) ) )
with atomic_write_in_dir ( stats_path ) as f :
f . write ( result )
else :
cloudlog . error ( " stats dir full " )
if __name__ == " __main__ " :
main ( )
else :
statlog = StatLog ( )