@ -3,8 +3,9 @@ import os
import zmq
import zmq
import time
import time
from pathlib import Path
from pathlib import Path
from collections import defaultdict
from datetime import datetime , timezone
from datetime import datetime , timezone
from typing import NoReturn
from typing import NoReturn , Union , List , Dict
from common . params import Params
from common . params import Params
from cereal . messaging import SubMaster
from cereal . messaging import SubMaster
@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK
class METRIC_TYPE :
class METRIC_TYPE :
GAUGE = ' g '
GAUGE = ' g '
SAMPLE = ' sa '
class StatLog :
class StatLog :
def __init__ ( self ) :
def __init__ ( self ) :
@ -42,14 +44,27 @@ class StatLog:
def gauge ( self , name : str , value : float ) - > None :
def gauge ( self , name : str , value : float ) - > None :
self . _send ( f " { name } : { value } | { METRIC_TYPE . GAUGE } " )
self . _send ( f " { name } : { value } | { METRIC_TYPE . GAUGE } " )
# Samples will be recorded in a buffer and at aggregation time,
# statistical properties will be logged (mean, count, percentiles, ...)
def sample ( self , name : str , value : float ) :
self . _send ( f " { name } : { value } | { METRIC_TYPE . SAMPLE } " )
def main ( ) - > NoReturn :
def main ( ) - > NoReturn :
dongle_id = Params ( ) . get ( " DongleId " , encoding = ' utf-8 ' )
dongle_id = Params ( ) . get ( " DongleId " , encoding = ' utf-8 ' )
def get_influxdb_line ( measurement : str , value : float , timestamp : datetime , tags : dict ) - > str :
def get_influxdb_line ( measurement : str , value : Union [ float , Dict [ str , float ] ] , timestamp : datetime , tags : dict ) - > str :
res = f " { measurement } "
res = f " { measurement } "
for k , v in tags . items ( ) :
for k , v in tags . items ( ) :
res + = f " , { k } = { str ( v ) } "
res + = f " , { k } = { str ( v ) } "
res + = f " value= { value } ,dongle_id= \" { dongle_id } \" { int ( timestamp . timestamp ( ) * 1e9 ) } \n "
res + = " "
if isinstance ( value , float ) :
value = { ' value ' : value }
for k , v in value . items ( ) :
res + = f " { k } = { v } , "
res + = f " dongle_id= \" { dongle_id } \" { int ( timestamp . timestamp ( ) * 1e9 ) } \n "
return res
return res
# open statistics socket
# open statistics socket
@ -75,6 +90,7 @@ def main() -> NoReturn:
last_flush_time = time . monotonic ( )
last_flush_time = time . monotonic ( )
gauges = { }
gauges = { }
samples : Dict [ str , List [ float ] ] = defaultdict ( list )
while True :
while True :
started_prev = sm [ ' deviceState ' ] . started
started_prev = sm [ ' deviceState ' ] . started
sm . update ( )
sm . update ( )
@ -86,10 +102,12 @@ def main() -> NoReturn:
try :
try :
metric_type = metric . split ( ' | ' ) [ 1 ]
metric_type = metric . split ( ' | ' ) [ 1 ]
metric_name = metric . split ( ' : ' ) [ 0 ]
metric_name = metric . split ( ' : ' ) [ 0 ]
metric_value = metric . split ( ' | ' ) [ 0 ] . split ( ' : ' ) [ 1 ]
metric_value = float ( metric . split ( ' | ' ) [ 0 ] . split ( ' : ' ) [ 1 ] )
if metric_type == METRIC_TYPE . GAUGE :
if metric_type == METRIC_TYPE . GAUGE :
gauges [ metric_name ] = metric_value
gauges [ metric_name ] = metric_value
elif metric_type == METRIC_TYPE . SAMPLE :
samples [ metric_name ] . append ( metric_value )
else :
else :
cloudlog . event ( " unknown metric type " , metric_type = metric_type )
cloudlog . event ( " unknown metric type " , metric_type = metric_type )
except Exception :
except Exception :
@ -103,11 +121,29 @@ def main() -> NoReturn:
current_time = datetime . utcnow ( ) . replace ( tzinfo = timezone . utc )
current_time = datetime . utcnow ( ) . replace ( tzinfo = timezone . utc )
tags [ ' started ' ] = sm [ ' deviceState ' ] . started
tags [ ' started ' ] = sm [ ' deviceState ' ] . started
for gauge_key in gauges :
for key , value in gauges . items ( ) :
result + = get_influxdb_line ( f " gauge. { gauge_key } " , gauges [ gauge_key ] , current_time , tags )
result + = get_influxdb_line ( f " gauge. { key } " , value , current_time , tags )
for key , values in samples . items ( ) :
values . sort ( )
sample_count = len ( values )
sample_sum = sum ( values )
stats = {
' count ' : sample_count ,
' min ' : values [ 0 ] ,
' max ' : values [ - 1 ] ,
' mean ' : sample_sum / sample_count ,
}
for percentile in [ 0.05 , 0.5 , 0.95 ] :
value = values [ int ( round ( percentile * ( sample_count - 1 ) ) ) ]
stats [ f " p { int ( percentile * 100 ) } " ] = value
result + = get_influxdb_line ( f " sample. { key } " , stats , current_time , tags )
# clear intermediate data
# clear intermediate data
gauges = { }
gauges . clear ( )
samples . clear ( )
last_flush_time = time . monotonic ( )
last_flush_time = time . monotonic ( )
# check that we aren't filling up the drive
# check that we aren't filling up the drive