@ -3,8 +3,9 @@ import os 
			
		
	
		
			
				
					import  zmq  
			
		
	
		
			
				
					import  time  
			
		
	
		
			
				
					from  pathlib  import  Path  
			
		
	
		
			
				
					from  collections  import  defaultdict  
			
		
	
		
			
				
					from  datetime  import  datetime ,  timezone  
			
		
	
		
			
				
					from  typing  import  NoReturn  
			
		
	
		
			
				
					from  typing  import  NoReturn ,  Union ,  List ,  Dict  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					from  common . params  import  Params  
			
		
	
		
			
				
					from  cereal . messaging  import  SubMaster  
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -17,6 +18,7 @@ from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCK 
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					class  METRIC_TYPE :  
			
		
	
		
			
				
					  GAUGE  =  ' g '   
			
		
	
		
			
				
					  SAMPLE  =  ' sa '   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					class  StatLog :  
			
		
	
		
			
				
					  def  __init__ ( self ) :   
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -42,14 +44,27 @@ class StatLog: 
			
		
	
		
			
				
					  def  gauge ( self ,  name :  str ,  value :  float )  - >  None :   
			
		
	
		
			
				
					    self . _send ( f " { name } : { value } | { METRIC_TYPE . GAUGE } " )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					  # Samples will be recorded in a buffer and at aggregation time,   
			
		
	
		
			
				
					  # statistical properties will be logged (mean, count, percentiles, ...)   
			
		
	
		
			
				
					  def  sample ( self ,  name :  str ,  value :  float ) :   
			
		
	
		
			
				
					    self . _send ( f " { name } : { value } | { METRIC_TYPE . SAMPLE } " )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					def  main ( )  - >  NoReturn :  
			
		
	
		
			
				
					  dongle_id  =  Params ( ) . get ( " DongleId " ,  encoding = ' utf-8 ' )   
			
		
	
		
			
				
					  def  get_influxdb_line ( measurement :  str ,  value :  float ,  timestamp :  datetime ,  tags :  dict )  - >  str :   
			
		
	
		
			
				
					  def  get_influxdb_line ( measurement :  str ,  value :  Union [ float ,  Dict [ str ,  float ] ] ,    timestamp :  datetime ,  tags :  dict )  - >  str :   
			
		
	
		
			
				
					    res  =  f " { measurement } "   
			
		
	
		
			
				
					    for  k ,  v  in  tags . items ( ) :   
			
		
	
		
			
				
					      res  + =  f " , { k } = { str ( v ) } "   
			
		
	
		
			
				
					    res  + =  f "  value= { value } ,dongle_id= \" { dongle_id } \"   { int ( timestamp . timestamp ( )  *  1e9 ) } \n "   
			
		
	
		
			
				
					    res  + =  "   "   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					    if  isinstance ( value ,  float ) :   
			
		
	
		
			
				
					      value  =  { ' value ' :  value }   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					    for  k ,  v  in  value . items ( ) :   
			
		
	
		
			
				
					      res  + =  f " { k } = { v } , "   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					    res  + =  f " dongle_id= \" { dongle_id } \"   { int ( timestamp . timestamp ( )  *  1e9 ) } \n "   
			
		
	
		
			
				
					    return  res   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					  # open statistics socket   
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -75,6 +90,7 @@ def main() -> NoReturn: 
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					  last_flush_time  =  time . monotonic ( )   
			
		
	
		
			
				
					  gauges  =  { }   
			
		
	
		
			
				
					  samples :  Dict [ str ,  List [ float ] ]  =  defaultdict ( list )   
			
		
	
		
			
				
					  while  True :   
			
		
	
		
			
				
					    started_prev  =  sm [ ' deviceState ' ] . started   
			
		
	
		
			
				
					    sm . update ( )   
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -86,10 +102,12 @@ def main() -> NoReturn: 
			
		
	
		
			
				
					        try :   
			
		
	
		
			
				
					          metric_type  =  metric . split ( ' | ' ) [ 1 ]   
			
		
	
		
			
				
					          metric_name  =  metric . split ( ' : ' ) [ 0 ]   
			
		
	
		
			
				
					          metric_value  =  metric . split ( ' | ' ) [ 0 ] . split ( ' : ' ) [ 1 ]   
			
		
	
		
			
				
					          metric_value  =  float ( metric . split ( ' | ' ) [ 0 ] . split ( ' : ' ) [ 1 ] )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					          if  metric_type  ==  METRIC_TYPE . GAUGE :   
			
		
	
		
			
				
					            gauges [ metric_name ]  =  metric_value   
			
		
	
		
			
				
					          elif  metric_type  ==  METRIC_TYPE . SAMPLE :   
			
		
	
		
			
				
					            samples [ metric_name ] . append ( metric_value )   
			
		
	
		
			
				
					          else :   
			
		
	
		
			
				
					            cloudlog . event ( " unknown metric type " ,  metric_type = metric_type )   
			
		
	
		
			
				
					        except  Exception :   
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -103,11 +121,29 @@ def main() -> NoReturn: 
			
		
	
		
			
				
					      current_time  =  datetime . utcnow ( ) . replace ( tzinfo = timezone . utc )   
			
		
	
		
			
				
					      tags [ ' started ' ]  =  sm [ ' deviceState ' ] . started   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					      for  gauge_key  in  gauges :   
			
		
	
		
			
				
					        result  + =  get_influxdb_line ( f " gauge. { gauge_key } " ,  gauges [ gauge_key ] ,  current_time ,  tags )   
			
		
	
		
			
				
					      for  key ,  value  in  gauges . items ( ) :   
			
		
	
		
			
				
					        result  + =  get_influxdb_line ( f " gauge. { key } " ,  value ,  current_time ,  tags )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					      for  key ,  values  in  samples . items ( ) :   
			
		
	
		
			
				
					        values . sort ( )   
			
		
	
		
			
				
					        sample_count  =  len ( values )   
			
		
	
		
			
				
					        sample_sum  =  sum ( values )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					        stats  =  {   
			
		
	
		
			
				
					          ' count ' :  sample_count ,   
			
		
	
		
			
				
					          ' min ' :  values [ 0 ] ,   
			
		
	
		
			
				
					          ' max ' :  values [ - 1 ] ,   
			
		
	
		
			
				
					          ' mean ' :  sample_sum  /  sample_count ,   
			
		
	
		
			
				
					        }   
			
		
	
		
			
				
					        for  percentile  in  [ 0.05 ,  0.5 ,  0.95 ] :   
			
		
	
		
			
				
					          value  =  values [ int ( round ( percentile  *  ( sample_count  -  1 ) ) ) ]   
			
		
	
		
			
				
					          stats [ f " p { int ( percentile  *  100 ) } " ]  =  value   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					        result  + =  get_influxdb_line ( f " sample. { key } " ,  stats ,  current_time ,  tags )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					      # clear intermediate data   
			
		
	
		
			
				
					      gauges  =  { }   
			
		
	
		
			
				
					      gauges . clear ( )   
			
		
	
		
			
				
					      samples . clear ( )   
			
		
	
		
			
				
					      last_flush_time  =  time . monotonic ( )   
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					      # check that we aren't filling up the drive