You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					219 lines
				
				6.8 KiB
			
		
		
			
		
	
	
					219 lines
				
				6.8 KiB
			| 
											6 years ago
										 | # coding: utf-8
 | ||
|  | # vim: set ts=4 sw=4 et:
 | ||
|  | """ This file contains some utils for connecting to Logentries
 | ||
|  |     as well as storing logs in a queue and sending them."""
 | ||
|  | 
 | ||
|  | VERSION = '2.0.7'
 | ||
|  | 
 | ||
|  | from logentries import helpers as le_helpers
 | ||
|  | 
 | ||
|  | import logging
 | ||
|  | import threading
 | ||
|  | import socket
 | ||
|  | import random
 | ||
|  | import time
 | ||
|  | import sys
 | ||
|  | 
 | ||
|  | import certifi
 | ||
|  | 
 | ||
|  | 
 | ||
|  | # Size of the internal event queue
 | ||
|  | QUEUE_SIZE = 32768
 | ||
|  | # Logentries API server address
 | ||
|  | LE_API_DEFAULT = "data.logentries.com"
 | ||
|  | # Port number for token logging to Logentries API server
 | ||
|  | LE_PORT_DEFAULT = 80
 | ||
|  | LE_TLS_PORT_DEFAULT = 443
 | ||
|  | # Minimal delay between attempts to reconnect in seconds
 | ||
|  | MIN_DELAY = 0.1
 | ||
|  | # Maximal delay between attempts to recconect in seconds
 | ||
|  | MAX_DELAY = 10
 | ||
|  | # Unicode Line separator character   \u2028
 | ||
|  | LINE_SEP = le_helpers.to_unicode('\u2028')
 | ||
|  | 
 | ||
|  | 
 | ||
|  | # LE appender signature - used for debugging messages
 | ||
|  | LE = "LE: "
 | ||
|  | # Error message displayed when an incorrect Token has been detected
 | ||
|  | INVALID_TOKEN = ("\n\nIt appears the LOGENTRIES_TOKEN "
 | ||
|  |                  "parameter you entered is incorrect!\n\n")
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def dbg(msg):
 | ||
|  |     print(LE + msg)
 | ||
|  | 
 | ||
|  | 
 | ||
|  | class PlainTextSocketAppender(threading.Thread):
 | ||
|  |     def __init__(self, verbose=True, le_api=LE_API_DEFAULT, le_port=LE_PORT_DEFAULT, le_tls_port=LE_TLS_PORT_DEFAULT):
 | ||
|  |         threading.Thread.__init__(self)
 | ||
|  | 
 | ||
|  |         # Logentries API server address
 | ||
|  |         self.le_api = le_api
 | ||
|  | 
 | ||
|  |         # Port number for token logging to Logentries API server
 | ||
|  |         self.le_port = le_port
 | ||
|  |         self.le_tls_port = le_tls_port
 | ||
|  | 
 | ||
|  |         self.daemon = True
 | ||
|  |         self.verbose = verbose
 | ||
|  |         self._conn = None
 | ||
|  |         self._queue = le_helpers.create_queue(QUEUE_SIZE)
 | ||
|  | 
 | ||
|  |     def empty(self):
 | ||
|  |         return self._queue.empty()
 | ||
|  | 
 | ||
|  |     def open_connection(self):
 | ||
|  |         self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | ||
|  |         self._conn.connect((self.le_api, self.le_port))
 | ||
|  | 
 | ||
|  |     def reopen_connection(self):
 | ||
|  |         self.close_connection()
 | ||
|  | 
 | ||
|  |         root_delay = MIN_DELAY
 | ||
|  |         while True:
 | ||
|  |             try:
 | ||
|  |                 self.open_connection()
 | ||
|  |                 return
 | ||
|  |             except Exception:
 | ||
|  |                 if self.verbose:
 | ||
|  |                     dbg("Unable to connect to Logentries")
 | ||
|  | 
 | ||
|  |             root_delay *= 2
 | ||
|  |             if(root_delay > MAX_DELAY):
 | ||
|  |                 root_delay = MAX_DELAY
 | ||
|  | 
 | ||
|  |             wait_for = root_delay + random.uniform(0, root_delay)
 | ||
|  | 
 | ||
|  |             try:
 | ||
|  |                 time.sleep(wait_for)
 | ||
|  |             except KeyboardInterrupt:
 | ||
|  |                 raise
 | ||
|  | 
 | ||
|  |     def close_connection(self):
 | ||
|  |         if self._conn is not None:
 | ||
|  |             self._conn.close()
 | ||
|  | 
 | ||
|  |     def run(self):
 | ||
|  |         try:
 | ||
|  |             # Open connection
 | ||
|  |             self.reopen_connection()
 | ||
|  | 
 | ||
|  |             # Send data in queue
 | ||
|  |             while True:
 | ||
|  |                 # Take data from queue
 | ||
|  |                 data = self._queue.get(block=True)
 | ||
|  | 
 | ||
|  |                 # Replace newlines with Unicode line separator
 | ||
|  |                 # for multi-line events
 | ||
|  |                 if not le_helpers.is_unicode(data):
 | ||
|  |                     multiline = le_helpers.create_unicode(data).replace(
 | ||
|  |                         '\n', LINE_SEP)
 | ||
|  |                 else:
 | ||
|  |                     multiline = data.replace('\n', LINE_SEP)
 | ||
|  |                 multiline += "\n"
 | ||
|  |                 # Send data, reconnect if needed
 | ||
|  |                 while True:
 | ||
|  |                     try:
 | ||
|  |                         self._conn.send(multiline.encode('utf-8'))
 | ||
|  |                     except socket.error:
 | ||
|  |                         self.reopen_connection()
 | ||
|  |                         continue
 | ||
|  |                     break
 | ||
|  |         except KeyboardInterrupt:
 | ||
|  |             if self.verbose:
 | ||
|  |                 dbg("Logentries asynchronous socket client interrupted")
 | ||
|  | 
 | ||
|  |         self.close_connection()
 | ||
|  | 
 | ||
|  | SocketAppender = PlainTextSocketAppender
 | ||
|  | 
 | ||
|  | try:
 | ||
|  |     import ssl
 | ||
|  |     ssl_enabled = True
 | ||
|  | except ImportError:  # for systems without TLS support.
 | ||
|  |     ssl_enabled = False
 | ||
|  |     dbg("Unable to import ssl module. Will send over port 80.")
 | ||
|  | else:
 | ||
|  |     class TLSSocketAppender(PlainTextSocketAppender):
 | ||
|  | 
 | ||
|  |         def open_connection(self):
 | ||
|  |             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | ||
|  |             sock = ssl.wrap_socket(
 | ||
|  |                 sock=sock,
 | ||
|  |                 keyfile=None,
 | ||
|  |                 certfile=None,
 | ||
|  |                 server_side=False,
 | ||
|  |                 cert_reqs=ssl.CERT_REQUIRED,
 | ||
|  |                 ssl_version=getattr(
 | ||
|  |                     ssl,
 | ||
|  |                     'PROTOCOL_TLSv1_2',
 | ||
|  |                     ssl.PROTOCOL_TLSv1
 | ||
|  |                 ),
 | ||
|  |                 ca_certs=certifi.where(),
 | ||
|  |                 do_handshake_on_connect=True,
 | ||
|  |                 suppress_ragged_eofs=True,
 | ||
|  |             )
 | ||
|  | 
 | ||
|  |             sock.connect((self.le_api, self.le_tls_port))
 | ||
|  |             self._conn = sock
 | ||
|  | 
 | ||
|  | 
 | ||
|  | class LogentriesHandler(logging.Handler):
 | ||
|  |     def __init__(self, token, use_tls=True, verbose=True, format=None, le_api=LE_API_DEFAULT, le_port=LE_PORT_DEFAULT, le_tls_port=LE_TLS_PORT_DEFAULT):
 | ||
|  |         logging.Handler.__init__(self)
 | ||
|  |         self.token = token
 | ||
|  |         self.good_config = True
 | ||
|  |         self.verbose = verbose
 | ||
|  |         # give the socket 10 seconds to flush,
 | ||
|  |         # otherwise drop logs
 | ||
|  |         self.timeout = 10
 | ||
|  |         if not le_helpers.check_token(token):
 | ||
|  |             if self.verbose:
 | ||
|  |                 dbg(INVALID_TOKEN)
 | ||
|  |             self.good_config = False
 | ||
|  |         if format is None:
 | ||
|  |             format = logging.Formatter('%(asctime)s : %(levelname)s, %(message)s',
 | ||
|  |                                        '%a %b %d %H:%M:%S %Z %Y')
 | ||
|  |         self.setFormatter(format)
 | ||
|  |         self.setLevel(logging.DEBUG)
 | ||
|  |         if use_tls and ssl_enabled:
 | ||
|  |             self._thread = TLSSocketAppender(verbose=verbose, le_api=le_api, le_port=le_port, le_tls_port=le_tls_port)
 | ||
|  |         else:
 | ||
|  |             self._thread = SocketAppender(verbose=verbose, le_api=le_api, le_port=le_port, le_tls_port=le_tls_port)
 | ||
|  | 
 | ||
|  |     def flush(self):
 | ||
|  |         # wait for all queued logs to be send
 | ||
|  |         now = time.time()
 | ||
|  |         while not self._thread.empty():
 | ||
|  |             time.sleep(0.2)
 | ||
|  |             if time.time() - now > self.timeout:
 | ||
|  |                 break
 | ||
|  | 
 | ||
|  |     def emit_raw(self, msg):
 | ||
|  |         if self.good_config and not self._thread.is_alive():
 | ||
|  |             try:
 | ||
|  |                 self._thread.start()
 | ||
|  |                 if self.verbose:
 | ||
|  |                     dbg("Starting Logentries Asynchronous Socket Appender")
 | ||
|  |             except RuntimeError: # It's already started.
 | ||
|  |                 pass
 | ||
|  | 
 | ||
|  |         msg = self.token + msg
 | ||
|  |         try:
 | ||
|  |             self._thread._queue.put_nowait(msg)
 | ||
|  |         except Exception:
 | ||
|  |             # Queue is full, try to remove the oldest message and put again
 | ||
|  |             try:
 | ||
|  |                 self._thread._queue.get_nowait()
 | ||
|  |                 self._thread._queue.put_nowait(msg)
 | ||
|  |             except Exception:
 | ||
|  |                 # Race condition, no need for any action here
 | ||
|  |                 pass
 | ||
|  | 
 | ||
|  |     def emit(self, record):
 | ||
|  |         msg = self.format(record).rstrip('\n')
 | ||
|  |         self.emit_raw(msg)
 | ||
|  | 
 | ||
|  |     def close(self):
 | ||
|  |         logging.Handler.close(self)
 |