#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  __future__  import  annotations 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  base64 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  hashlib 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  io 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  json 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  queue 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  select 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  socket 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  sys 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  tempfile 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  dataclasses  import  asdict ,  dataclass ,  replace 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  datetime  import  datetime 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  functools  import  partial ,  total_ordering 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  queue  import  Queue 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  typing  import  cast 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections . abc  import  Callable 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  requests 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  jsonrpc  import  JSONRPCResponseManager ,  dispatcher 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  websocket  import  ( ABNF ,  WebSocket ,  WebSocketException ,  WebSocketTimeoutException , 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                       create_connection ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . services  import  SERVICE_LIST 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . api  import  Api 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . file_helpers  import  CallbackReader ,  get_upload_stream 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . realtime  import  set_core_affinity 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . hardware  import  HARDWARE ,  PC 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . loggerd . xattr_cache  import  getxattr ,  setxattr 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . swaglog  import  cloudlog 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . version  import  get_build_metadata 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . hardware . hw  import  Paths 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								ATHENA_HOST  =  os . getenv ( ' ATHENA_HOST ' ,  ' wss://athena.comma.ai ' ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								HANDLER_THREADS  =  int ( os . getenv ( ' HANDLER_THREADS ' ,  " 4 " ) ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								LOCAL_PORT_WHITELIST  =  { 22 ,  }   # SSH 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LOG_ATTR_NAME  =  ' user.upload ' 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LOG_ATTR_VALUE_MAX_UNIX_TIME  =  int . to_bytes ( 2147483647 ,  4 ,  sys . byteorder ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								RECONNECT_TIMEOUT_S  =  70 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								RETRY_DELAY  =  10   # seconds 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								MAX_RETRY_COUNT  =  30   # Try for at most 5 minutes if upload fails immediately 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								MAX_AGE  =  31  *  24  *  3600   # seconds 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								WS_FRAME_SIZE  =  4096 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								DEVICE_STATE_UPDATE_INTERVAL  =  1.0   # in seconds 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								DEFAULT_UPLOAD_PRIORITY  =  99   # higher number = lower priority 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								NetworkType  =  log . DeviceState . NetworkType 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								UploadFileDict  =  dict [ str ,  str  |  int  |  float  |  bool ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								UploadItemDict  =  dict [ str ,  str  |  bool  |  int  |  float  |  dict [ str ,  str ] ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								UploadFilesToUrlResponse  =  dict [ str ,  int  |  list [ UploadItemDict ]  |  list [ str ] ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dataclass 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  UploadFile : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  fn :  str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  url :  str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  headers :  dict [ str ,  str ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  allow_cellular :  bool 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  priority :  int  =  DEFAULT_UPLOAD_PRIORITY 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  from_dict ( cls ,  d :  dict )  - >  UploadFile : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  cls ( d . get ( " fn " ,  " " ) ,  d . get ( " url " ,  " " ) ,  d . get ( " headers " ,  { } ) ,  d . get ( " allow_cellular " ,  False ) ,  d . get ( " priority " ,  DEFAULT_UPLOAD_PRIORITY ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dataclass 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@total_ordering 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  UploadItem : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  path :  str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  url :  str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  headers :  dict [ str ,  str ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  created_at :  int 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  id :  str  |  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  retry_count :  int  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  current :  bool  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  progress :  float  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  allow_cellular :  bool  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  priority :  int  =  DEFAULT_UPLOAD_PRIORITY 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  from_dict ( cls ,  d :  dict )  - >  UploadItem : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  cls ( d [ " path " ] ,  d [ " url " ] ,  d [ " headers " ] ,  d [ " created_at " ] ,  d [ " id " ] ,  d [ " retry_count " ] ,  d [ " current " ] , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								               d [ " progress " ] ,  d [ " allow_cellular " ] ,  d [ " priority " ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __lt__ ( self ,  other ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  isinstance ( other ,  UploadItem ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  NotImplemented 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . priority  <  other . priority 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __eq__ ( self ,  other ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  isinstance ( other ,  UploadItem ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  NotImplemented 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . priority  ==  other . priority 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								dispatcher [ " echo " ]  =  lambda  s :  s 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								recv_queue :  Queue [ str ]  =  queue . Queue ( ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								send_queue :  Queue [ str ]  =  queue . Queue ( ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								upload_queue :  Queue [ UploadItem ]  =  queue . PriorityQueue ( ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								low_priority_send_queue :  Queue [ str ]  =  queue . Queue ( ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								log_recv_queue :  Queue [ str ]  =  queue . Queue ( ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								cancelled_uploads :  set [ str ]  =  set ( ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								cur_upload_items :  dict [ int ,  UploadItem  |  None ]  =  { } 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  strip_zst_extension ( fn :  str )  - >  str : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  fn . endswith ( ' .zst ' ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  fn [ : - 4 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  fn 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  AbortTransferException ( Exception ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  UploadQueueCache : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @staticmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  initialize ( upload_queue :  Queue [ UploadItem ] )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      upload_queue_json  =  Params ( ) . get ( " AthenadUploadQueue " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  upload_queue_json  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  item  in  json . loads ( upload_queue_json ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          upload_queue . put ( UploadItem . from_dict ( item ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena.UploadQueueCache.initialize.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @staticmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  cache ( upload_queue :  Queue [ UploadItem ] )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      queue :  list [ UploadItem  |  None ]  =  list ( upload_queue . queue ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      items  =  [ asdict ( i )  for  i  in  queue  if  i  is  not  None  and  ( i . id  not  in  cancelled_uploads ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      Params ( ) . put ( " AthenadUploadQueue " ,  json . dumps ( items ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena.UploadQueueCache.cache.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  handle_long_poll ( ws :  WebSocket ,  exit_event :  threading . Event  |  None )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  end_event  =  threading . Event ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  threads  =  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = ws_manage ,  args = ( ws ,  end_event ) ,  name = ' ws_manage ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = ws_recv ,  args = ( ws ,  end_event ) ,  name = ' ws_recv ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = ws_send ,  args = ( ws ,  end_event ) ,  name = ' ws_send ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = upload_handler ,  args = ( end_event , ) ,  name = ' upload_handler ' ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    threading . Thread ( target = upload_handler ,  args = ( end_event , ) ,  name = ' upload_handler2 ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = upload_handler ,  args = ( end_event , ) ,  name = ' upload_handler3 ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = upload_handler ,  args = ( end_event , ) ,  name = ' upload_handler4 ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = log_handler ,  args = ( end_event , ) ,  name = ' log_handler ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = stat_handler ,  args = ( end_event , ) ,  name = ' stat_handler ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ]  +  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = jsonrpc_handler ,  args = ( end_event , ) ,  name = f ' worker_ { x } ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  x  in  range ( HANDLER_THREADS ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  thread  in  threads : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    thread . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  not  end_event . wait ( 0.1 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  exit_event  is  not  None  and  exit_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  ( KeyboardInterrupt ,  SystemExit ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  thread  in  threads : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . debug ( f " athena.joining  { thread . name } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      thread . join ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  jsonrpc_handler ( end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  dispatcher [ " startLocalProxy " ]  =  partial ( startLocalProxy ,  end_event ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      data  =  recv_queue . get ( timeout = 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  " method "  in  data : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . event ( " athena.jsonrpc_handler.call_method " ,  data = data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        response  =  JSONRPCResponseManager . handle ( data ,  dispatcher ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        send_queue . put_nowait ( response . json ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  " id "  in  data  and  ( " result "  in  data  or  " error "  in  data ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        log_recv_queue . put_nowait ( data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        raise  Exception ( " not a valid request or response " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  queue . Empty : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception  as  e : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena jsonrpc handler failed " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      send_queue . put_nowait ( json . dumps ( { " error " :  str ( e ) } ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  retry_upload ( tid :  int ,  end_event :  threading . Event ,  increase_count :  bool  =  True )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  item  =  cur_upload_items [ tid ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  item  is  not  None  and  item . retry_count  <  MAX_RETRY_COUNT : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    new_retry_count  =  item . retry_count  +  1  if  increase_count  else  item . retry_count 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    item  =  replace ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      item , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      retry_count = new_retry_count , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      progress = 0 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      current = False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    upload_queue . put_nowait ( item ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    UploadQueueCache . cache ( upload_queue ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cur_upload_items [ tid ]  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( RETRY_DELAY ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  cb ( sm ,  item ,  tid ,  end_event :  threading . Event ,  sz :  int ,  cur :  int )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # Abort transfer if connection changed to metered after starting upload 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # or if athenad is shutting down to re-connect the websocket 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  if  not  item . allow_cellular : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( time . monotonic ( )  -  sm . recv_time [ ' deviceState ' ] )  >  DEVICE_STATE_UPDATE_INTERVAL : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sm . update ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  sm [ ' deviceState ' ] . networkMetered : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        raise  AbortTransferException 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  AbortTransferException 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  cur_upload_items [ tid ]  =  replace ( item ,  progress = cur  /  sz  if  sz  else  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  upload_handler ( end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sm  =  messaging . SubMaster ( [ ' deviceState ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  tid  =  threading . get_ident ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cur_upload_items [ tid ]  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      cur_upload_items [ tid ]  =  item  =  replace ( upload_queue . get ( timeout = 1 ) ,  current = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  item . id  in  cancelled_uploads : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cancelled_uploads . remove ( item . id ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Remove item if too old 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      age  =  datetime . now ( )  -  datetime . fromtimestamp ( item . created_at  /  1000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  age . total_seconds ( )  >  MAX_AGE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . event ( " athena.upload_handler.expired " ,  item = item ,  error = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Check if uploading over metered connection is allowed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sm . update ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      metered  =  sm [ ' deviceState ' ] . networkMetered 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      network_type  =  sm [ ' deviceState ' ] . networkType . raw 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  metered  and  ( not  item . allow_cellular ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        retry_upload ( tid ,  end_event ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        fn  =  item . path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sz  =  os . path . getsize ( fn ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  OSError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sz  =  - 1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . event ( " athena.upload_handler.upload_start " ,  fn = fn ,  sz = sz ,  network_type = network_type ,  metered = metered ,  retry_count = item . retry_count ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        with  _do_upload ( item ,  partial ( cb ,  sm ,  item ,  tid ,  end_event ) )  as  response : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  response . status_code  not  in  ( 200 ,  201 ,  401 ,  403 ,  412 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cloudlog . event ( " athena.upload_handler.retry " ,  status_code = response . status_code ,  fn = fn ,  sz = sz ,  network_type = network_type ,  metered = metered ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            retry_upload ( tid ,  end_event ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cloudlog . event ( " athena.upload_handler.success " ,  fn = fn ,  sz = sz ,  network_type = network_type ,  metered = metered ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        UploadQueueCache . cache ( upload_queue ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  ( requests . exceptions . Timeout ,  requests . exceptions . ConnectionError ,  requests . exceptions . SSLError ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . event ( " athena.upload_handler.timeout " ,  fn = fn ,  sz = sz ,  network_type = network_type ,  metered = metered ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        retry_upload ( tid ,  end_event ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  AbortTransferException : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . event ( " athena.upload_handler.abort " ,  fn = fn ,  sz = sz ,  network_type = network_type ,  metered = metered ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        retry_upload ( tid ,  end_event ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  queue . Empty : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena.upload_handler.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  _do_upload ( upload_item :  UploadItem ,  callback :  Callable  =  None )  - >  requests . Response : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  path  =  upload_item . path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  compress  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # If file does not exist, but does exist without the .zst extension we will compress on the fly 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  not  os . path . exists ( path )  and  os . path . exists ( strip_zst_extension ( path ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    path  =  strip_zst_extension ( path ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    compress  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  stream  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    stream ,  content_length  =  get_upload_stream ( path ,  compress ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    response  =  requests . put ( upload_item . url , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                            data = CallbackReader ( stream ,  callback ,  content_length )  if  callback  else  stream , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                            headers = { * * upload_item . headers ,  ' Content-Length ' :  str ( content_length ) } , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                            timeout = 30 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  response 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  stream : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      stream . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# security: user should be able to request any message from their car 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getMessage ( service :  str ,  timeout :  int  =  1000 )  - >  dict : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  service  is  None  or  service  not  in  SERVICE_LIST : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  Exception ( " invalid service " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  socket  =  messaging . sub_sock ( service ,  timeout = timeout ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ret  =  messaging . recv_one ( socket ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  ret  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  TimeoutError 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # this is because capnp._DynamicStructReader doesn't have typing information 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  cast ( dict ,  ret . to_dict ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    del  socket 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getVersion ( )  - >  dict [ str ,  str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  build_metadata  =  get_build_metadata ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " version " :  build_metadata . openpilot . version , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " remote " :  build_metadata . openpilot . git_normalized_origin , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " branch " :  build_metadata . channel , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " commit " :  build_metadata . openpilot . git_commit , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  scan_dir ( path :  str ,  prefix :  str )  - >  list [ str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  files  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # only walk directories that match the prefix 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # (glob and friends traverse entire dir tree) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  with  os . scandir ( path )  as  i : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  e  in  i : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      rel_path  =  os . path . relpath ( e . path ,  Paths . log_root ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  e . is_dir ( follow_symlinks = False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # add trailing slash 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        rel_path  =  os . path . join ( rel_path ,  ' ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # if prefix is a partial dir name, current dir will start with prefix 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # if prefix is a partial file name, prefix with start with dir name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  rel_path . startswith ( prefix )  or  prefix . startswith ( rel_path ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          files . extend ( scan_dir ( e . path ,  prefix ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  rel_path . startswith ( prefix ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          files . append ( rel_path ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  files 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  listDataDirectory ( prefix = ' ' )  - >  list [ str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  scan_dir ( Paths . log_root ( ) ,  prefix ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  uploadFileToUrl ( fn :  str ,  url :  str ,  headers :  dict [ str ,  str ] )  - >  UploadFilesToUrlResponse : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # this is because mypy doesn't understand that the decorator doesn't change the return type 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  response :  UploadFilesToUrlResponse  =  uploadFilesToUrls ( [ { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " fn " :  fn , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " url " :  url , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " headers " :  headers , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  response 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  uploadFilesToUrls ( files_data :  list [ UploadFileDict ] )  - >  UploadFilesToUrlResponse : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  files  =  map ( UploadFile . from_dict ,  files_data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  items :  list [ UploadItemDict ]  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  failed :  list [ str ]  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  file  in  files : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( file . fn )  ==  0  or  file . fn [ 0 ]  ==  ' / '  or  ' .. '  in  file . fn  or  len ( file . url )  ==  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      failed . append ( file . fn ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    path  =  os . path . join ( Paths . log_root ( ) ,  file . fn ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  not  os . path . exists ( path )  and  not  os . path . exists ( strip_zst_extension ( path ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      failed . append ( file . fn ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Skip item if already in queue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    url  =  file . url . split ( ' ? ' ) [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  any ( url  ==  item [ ' url ' ] . split ( ' ? ' ) [ 0 ]  for  item  in  listUploadQueue ( ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    item  =  UploadItem ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      path = path , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      url = file . url , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      headers = file . headers , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      created_at = int ( time . time ( )  *  1000 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      id = None , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      allow_cellular = file . allow_cellular , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      priority = file . priority , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    upload_id  =  hashlib . sha1 ( str ( item ) . encode ( ) ) . hexdigest ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    item  =  replace ( item ,  id = upload_id ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    upload_queue . put_nowait ( item ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    items . append ( asdict ( item ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  UploadQueueCache . cache ( upload_queue ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  resp :  UploadFilesToUrlResponse  =  { " enqueued " :  len ( items ) ,  " items " :  items } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  failed : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cloudlog . event ( " athena.uploadFilesToUrls.failed " ,  failed = failed ,  error = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    resp [ " failed " ]  =  failed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  resp 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  listUploadQueue ( )  - >  list [ UploadItemDict ] : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  items  =  list ( upload_queue . queue )  +  list ( cur_upload_items . values ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  [ asdict ( i )  for  i  in  items  if  ( i  is  not  None )  and  ( i . id  not  in  cancelled_uploads ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  cancelUpload ( upload_id :  str  |  list [ str ] )  - >  dict [ str ,  int  |  str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  not  isinstance ( upload_id ,  list ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    upload_id  =  [ upload_id ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  uploading_ids  =  { item . id  for  item  in  list ( upload_queue . queue ) } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cancelled_ids  =  uploading_ids . intersection ( upload_id ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  len ( cancelled_ids )  ==  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  { " success " :  0 ,  " error " :  " not found " } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  cancelled_uploads . update ( cancelled_ids ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  { " success " :  1 } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  setRouteViewed ( route :  str )  - >  dict [ str ,  int  |  str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # maintain a list of the last 10 routes viewed in connect 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r  =  params . get ( " AthenadRecentlyViewedRoutes " ,  encoding = " utf8 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  routes  =  [ ]  if  r  is  None  else  r . split ( " , " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  routes . append ( route ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # remove duplicates 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  routes  =  list ( dict . fromkeys ( routes ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  params . put ( " AthenadRecentlyViewedRoutes " ,  " , " . join ( routes [ - 10 : ] ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  { " success " :  1 } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  startLocalProxy ( global_end_event :  threading . Event ,  remote_ws_uri :  str ,  local_port :  int )  - >  dict [ str ,  int ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # migration, can be removed once 0.9.8 is out for a while 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  local_port  ==  8022 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      local_port  =  22 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  local_port  not  in  LOCAL_PORT_WHITELIST : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  Exception ( " Requested local port not whitelisted " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . debug ( " athena.startLocalProxy.starting " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    dongle_id  =  Params ( ) . get ( " DongleId " ) . decode ( ' utf8 ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    identity_token  =  Api ( dongle_id ) . get_token ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ws  =  create_connection ( remote_ws_uri , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                           cookie = " jwt= "  +  identity_token , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                           enable_multithread = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Set TOS to keep connection responsive while under load. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # DSCP of 36/HDD_LINUX_AC_VI with the minimum delay flag 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ws . sock . setsockopt ( socket . IPPROTO_IP ,  socket . IP_TOS ,  0x90 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ssock ,  csock  =  socket . socketpair ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    local_sock  =  socket . socket ( socket . AF_INET ,  socket . SOCK_STREAM ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    local_sock . connect ( ( ' 127.0.0.1 ' ,  local_port ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    local_sock . setblocking ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    proxy_end_event  =  threading . Event ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threads  =  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      threading . Thread ( target = ws_proxy_recv ,  args = ( ws ,  local_sock ,  ssock ,  proxy_end_event ,  global_end_event ) ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      threading . Thread ( target = ws_proxy_send ,  args = ( ws ,  local_sock ,  csock ,  proxy_end_event ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  thread  in  threads : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      thread . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . debug ( " athena.startLocalProxy.started " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  { " success " :  1 } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  Exception  as  e : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . exception ( " athenad.startLocalProxy.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  e 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getPublicKey ( )  - >  str  |  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  not  os . path . isfile ( Paths . persist_root ( )  +  ' /comma/id_rsa.pub ' ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  with  open ( Paths . persist_root ( )  +  ' /comma/id_rsa.pub ' )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  f . read ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getSshAuthorizedKeys ( )  - >  str : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  Params ( ) . get ( " GithubSshKeys " ,  encoding = ' utf8 ' )  or  ' ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getGithubUsername ( )  - >  str : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  Params ( ) . get ( " GithubUsername " ,  encoding = ' utf8 ' )  or  ' ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getSimInfo ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  HARDWARE . get_sim_info ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getNetworkType ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  HARDWARE . get_network_type ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getNetworkMetered ( )  - >  bool : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  network_type  =  HARDWARE . get_network_type ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  HARDWARE . get_network_metered ( network_type ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  getNetworks ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  HARDWARE . get_networks ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@dispatcher . add_method 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  takeSnapshot ( )  - >  str  |  dict [ str ,  str ]  |  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  from  openpilot . system . camerad . snapshot . snapshot  import  jpeg_write ,  snapshot 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ret  =  snapshot ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ret  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  b64jpeg ( x ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  x  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        f  =  io . BytesIO ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        jpeg_write ( f ,  x ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  base64 . b64encode ( f . getvalue ( ) ) . decode ( " utf-8 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  { ' jpegBack ' :  b64jpeg ( ret [ 0 ] ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ' jpegFront ' :  b64jpeg ( ret [ 1 ] ) } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  Exception ( " not available while camerad is started " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_logs_to_send_sorted ( )  - >  list [ str ] : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # TODO: scan once then use inotify to detect file creation/deletion 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  curr_time  =  int ( time . time ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  logs  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  log_entry  in  os . listdir ( Paths . swaglog_root ( ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    log_path  =  os . path . join ( Paths . swaglog_root ( ) ,  log_entry ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time_sent  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      value  =  getxattr ( log_path ,  LOG_ATTR_NAME ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  value  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        time_sent  =  int . from_bytes ( value ,  sys . byteorder ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  ( ValueError ,  TypeError ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # assume send failed and we lost the response if sent more than one hour ago 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  time_sent  or  curr_time  -  time_sent  >  3600 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      logs . append ( log_entry ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # excluding most recent (active) log file 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  sorted ( logs ) [ : - 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  log_handler ( end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  PC : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  log_files  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  last_scan  =  0. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      curr_scan  =  time . monotonic ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  curr_scan  -  last_scan  >  10 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        log_files  =  get_logs_to_send_sorted ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        last_scan  =  curr_scan 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # send one log 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      curr_log  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  len ( log_files )  >  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        log_entry  =  log_files . pop ( )  # newest log file 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . debug ( f " athena.log_handler.forward_request  { log_entry } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          curr_time  =  int ( time . time ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          log_path  =  os . path . join ( Paths . swaglog_root ( ) ,  log_entry ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          setxattr ( log_path ,  LOG_ATTR_NAME ,  int . to_bytes ( curr_time ,  4 ,  sys . byteorder ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          with  open ( log_path )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            jsonrpc  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " method " :  " forwardLogs " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " params " :  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                " logs " :  f . read ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              } , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " jsonrpc " :  " 2.0 " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " id " :  log_entry 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            low_priority_send_queue . put_nowait ( json . dumps ( jsonrpc ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            curr_log  =  log_entry 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  OSError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          pass   # file could be deleted by log rotation 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # wait for response up to ~100 seconds 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # always read queue at least once to process any old responses that arrive 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  _  in  range ( 100 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          log_resp  =  json . loads ( log_recv_queue . get ( timeout = 1 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          log_entry  =  log_resp . get ( " id " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          log_success  =  " result "  in  log_resp  and  log_resp [ " result " ] . get ( " success " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cloudlog . debug ( f " athena.log_handler.forward_response  { log_entry }   { log_success } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  log_entry  and  log_success : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            log_path  =  os . path . join ( Paths . swaglog_root ( ) ,  log_entry ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              setxattr ( log_path ,  LOG_ATTR_NAME ,  LOG_ATTR_VALUE_MAX_UNIX_TIME ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            except  OSError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              pass   # file could be deleted by log rotation 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  curr_log  ==  log_entry : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  queue . Empty : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  curr_log  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena.log_handler.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  stat_handler ( end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  STATS_DIR  =  Paths . stats_root ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  last_scan  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    curr_scan  =  time . monotonic ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  curr_scan  -  last_scan  >  10 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        stat_filenames  =  list ( filter ( lambda  name :  not  name . startswith ( tempfile . gettempprefix ( ) ) ,  os . listdir ( STATS_DIR ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  len ( stat_filenames )  >  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          stat_path  =  os . path . join ( STATS_DIR ,  stat_filenames [ 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          with  open ( stat_path )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            jsonrpc  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " method " :  " storeStats " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " params " :  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                " stats " :  f . read ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              } , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " jsonrpc " :  " 2.0 " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              " id " :  stat_filenames [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            low_priority_send_queue . put_nowait ( json . dumps ( jsonrpc ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          os . remove ( stat_path ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        last_scan  =  curr_scan 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athena.stat_handler.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 0.1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  ws_proxy_recv ( ws :  WebSocket ,  local_sock :  socket . socket ,  ssock :  socket . socket ,  end_event :  threading . Event ,  global_end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  ( end_event . is_set ( )  or  global_end_event . is_set ( ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      r  =  select . select ( ( ws . sock , ) ,  ( ) ,  ( ) ,  30 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  r [ 0 ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        data  =  ws . recv ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  isinstance ( data ,  str ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          data  =  data . encode ( " utf-8 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        local_sock . sendall ( data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  WebSocketTimeoutException : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athenad.ws_proxy_recv.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cloudlog . debug ( " athena.ws_proxy_recv closing sockets " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ssock . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  local_sock . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ws . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cloudlog . debug ( " athena.ws_proxy_recv done closing sockets " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  ws_proxy_send ( ws :  WebSocket ,  local_sock :  socket . socket ,  signal_sock :  socket . socket ,  end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      r ,  _ ,  _  =  select . select ( ( local_sock ,  signal_sock ) ,  ( ) ,  ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  r : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  r [ 0 ] . fileno ( )  ==  signal_sock . fileno ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # got end signal from ws_proxy_recv 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        data  =  local_sock . recv ( 4096 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  not  data : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # local_sock is dead 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ws . send ( data ,  ABNF . OPCODE_BINARY ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athenad.ws_proxy_send.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cloudlog . debug ( " athena.ws_proxy_send closing sockets " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  signal_sock . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cloudlog . debug ( " athena.ws_proxy_send done closing sockets " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  ws_recv ( ws :  WebSocket ,  end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  last_ping  =  int ( time . monotonic ( )  *  1e9 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      opcode ,  data  =  ws . recv_data ( control_frame = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  opcode  in  ( ABNF . OPCODE_TEXT ,  ABNF . OPCODE_BINARY ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  opcode  ==  ABNF . OPCODE_TEXT : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          data  =  data . decode ( " utf-8 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        recv_queue . put_nowait ( data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  opcode  ==  ABNF . OPCODE_PING : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        last_ping  =  int ( time . monotonic ( )  *  1e9 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        Params ( ) . put ( " LastAthenaPingTime " ,  str ( last_ping ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  WebSocketTimeoutException : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ns_since_last_ping  =  int ( time . monotonic ( )  *  1e9 )  -  last_ping 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  ns_since_last_ping  >  RECONNECT_TIMEOUT_S  *  1e9 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . exception ( " athenad.ws_recv.timeout " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athenad.ws_recv.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  ws_send ( ws :  WebSocket ,  end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  end_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        data  =  send_queue . get_nowait ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  queue . Empty : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        data  =  low_priority_send_queue . get ( timeout = 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  i  in  range ( 0 ,  len ( data ) ,  WS_FRAME_SIZE ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        frame  =  data [ i : i + WS_FRAME_SIZE ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        last  =  i  +  WS_FRAME_SIZE  > =  len ( data ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        opcode  =  ABNF . OPCODE_TEXT  if  i  ==  0  else  ABNF . OPCODE_CONT 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ws . send_frame ( ABNF . create_frame ( frame ,  opcode ,  last ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  queue . Empty : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athenad.ws_send.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  ws_manage ( ws :  WebSocket ,  end_event :  threading . Event )  - >  None : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  onroad_prev  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sock  =  ws . sock 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  True : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    onroad  =  params . get_bool ( " IsOnroad " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  onroad  !=  onroad_prev : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      onroad_prev  =  onroad 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  sock  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # While not sending data, onroad, we can expect to time out in 7 + (7 * 2) = 21s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        #                         offroad, we can expect to time out in 30 + (10 * 3) = 60s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # FIXME: TCP_USER_TIMEOUT is effectively 2x for some reason (32s), so it's mostly unused 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  sys . platform  ==  ' linux ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sock . setsockopt ( socket . IPPROTO_TCP ,  socket . TCP_USER_TIMEOUT ,  16000  if  onroad  else  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sock . setsockopt ( socket . IPPROTO_TCP ,  socket . TCP_KEEPIDLE ,  7  if  onroad  else  30 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        elif  sys . platform  ==  ' darwin ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sock . setsockopt ( socket . IPPROTO_TCP ,  socket . TCP_KEEPALIVE ,  7  if  onroad  else  30 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        sock . setsockopt ( socket . IPPROTO_TCP ,  socket . TCP_KEEPINTVL ,  7  if  onroad  else  10 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        sock . setsockopt ( socket . IPPROTO_TCP ,  socket . TCP_KEEPCNT ,  2  if  onroad  else  3 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  end_event . wait ( 5 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  backoff ( retries :  int )  - >  int : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  random . randrange ( 0 ,  min ( 128 ,  int ( 2  * *  retries ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  main ( exit_event :  threading . Event  =  None ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    set_core_affinity ( [ 0 ,  1 ,  2 ,  3 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cloudlog . exception ( " failed to set core affinity " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  dongle_id  =  params . get ( " DongleId " ,  encoding = ' utf-8 ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  UploadQueueCache . initialize ( upload_queue ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ws_uri  =  ATHENA_HOST  +  " /ws/v2/ "  +  dongle_id 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  api  =  Api ( dongle_id ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  conn_start  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  conn_retries  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  exit_event  is  None  or  not  exit_event . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  conn_start  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        conn_start  =  time . monotonic ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . event ( " athenad.main.connecting_ws " ,  ws_uri = ws_uri ,  retries = conn_retries ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ws  =  create_connection ( ws_uri , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             cookie = " jwt= "  +  api . get_token ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             enable_multithread = True , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             timeout = 30.0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . event ( " athenad.main.connected_ws " ,  ws_uri = ws_uri ,  retries = conn_retries , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                     duration = time . monotonic ( )  -  conn_start ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      conn_start  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      conn_retries  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cur_upload_items . clear ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      handle_long_poll ( ws ,  exit_event ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ws . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  ( KeyboardInterrupt ,  SystemExit ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  ( ConnectionError ,  TimeoutError ,  WebSocketException ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      conn_retries  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      params . remove ( " LastAthenaPingTime " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cloudlog . exception ( " athenad.main.exception " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      conn_retries  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      params . remove ( " LastAthenaPingTime " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( backoff ( conn_retries ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  main ( )