You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					219 lines
				
				6.8 KiB
			
		
		
			
		
	
	
					219 lines
				
				6.8 KiB
			| 
								 
											6 years ago
										 
									 | 
							
								# coding: utf-8
							 | 
						||
| 
								 | 
							
								# vim: set ts=4 sw=4 et:
							 | 
						||
| 
								 | 
							
								""" This file contains some utils for connecting to Logentries
							 | 
						||
| 
								 | 
							
								    as well as storing logs in a queue and sending them."""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								VERSION = '2.0.7'
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from logentries import helpers as le_helpers
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import logging
							 | 
						||
| 
								 | 
							
								import threading
							 | 
						||
| 
								 | 
							
								import socket
							 | 
						||
| 
								 | 
							
								import random
							 | 
						||
| 
								 | 
							
								import time
							 | 
						||
| 
								 | 
							
								import sys
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import certifi
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Size of the internal event queue
							 | 
						||
| 
								 | 
							
								QUEUE_SIZE = 32768
							 | 
						||
| 
								 | 
							
								# Logentries API server address
							 | 
						||
| 
								 | 
							
								LE_API_DEFAULT = "data.logentries.com"
							 | 
						||
| 
								 | 
							
								# Port number for token logging to Logentries API server
							 | 
						||
| 
								 | 
							
								LE_PORT_DEFAULT = 80
							 | 
						||
| 
								 | 
							
								LE_TLS_PORT_DEFAULT = 443
							 | 
						||
| 
								 | 
							
								# Minimal delay between attempts to reconnect in seconds
							 | 
						||
| 
								 | 
							
								MIN_DELAY = 0.1
							 | 
						||
| 
								 | 
							
								# Maximal delay between attempts to recconect in seconds
							 | 
						||
| 
								 | 
							
								MAX_DELAY = 10
							 | 
						||
| 
								 | 
							
								# Unicode Line separator character   \u2028
							 | 
						||
| 
								 | 
							
								LINE_SEP = le_helpers.to_unicode('\u2028')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# LE appender signature - used for debugging messages
							 | 
						||
| 
								 | 
							
								LE = "LE: "
							 | 
						||
| 
								 | 
							
								# Error message displayed when an incorrect Token has been detected
							 | 
						||
| 
								 | 
							
								INVALID_TOKEN = ("\n\nIt appears the LOGENTRIES_TOKEN "
							 | 
						||
| 
								 | 
							
								                 "parameter you entered is incorrect!\n\n")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def dbg(msg):
							 | 
						||
| 
								 | 
							
								    print(LE + msg)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class PlainTextSocketAppender(threading.Thread):
							 | 
						||
| 
								 | 
							
								    def __init__(self, verbose=True, le_api=LE_API_DEFAULT, le_port=LE_PORT_DEFAULT, le_tls_port=LE_TLS_PORT_DEFAULT):
							 | 
						||
| 
								 | 
							
								        threading.Thread.__init__(self)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # Logentries API server address
							 | 
						||
| 
								 | 
							
								        self.le_api = le_api
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # Port number for token logging to Logentries API server
							 | 
						||
| 
								 | 
							
								        self.le_port = le_port
							 | 
						||
| 
								 | 
							
								        self.le_tls_port = le_tls_port
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.daemon = True
							 | 
						||
| 
								 | 
							
								        self.verbose = verbose
							 | 
						||
| 
								 | 
							
								        self._conn = None
							 | 
						||
| 
								 | 
							
								        self._queue = le_helpers.create_queue(QUEUE_SIZE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def empty(self):
							 | 
						||
| 
								 | 
							
								        return self._queue.empty()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def open_connection(self):
							 | 
						||
| 
								 | 
							
								        self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
							 | 
						||
| 
								 | 
							
								        self._conn.connect((self.le_api, self.le_port))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def reopen_connection(self):
							 | 
						||
| 
								 | 
							
								        self.close_connection()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        root_delay = MIN_DELAY
							 | 
						||
| 
								 | 
							
								        while True:
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                self.open_connection()
							 | 
						||
| 
								 | 
							
								                return
							 | 
						||
| 
								 | 
							
								            except Exception:
							 | 
						||
| 
								 | 
							
								                if self.verbose:
							 | 
						||
| 
								 | 
							
								                    dbg("Unable to connect to Logentries")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            root_delay *= 2
							 | 
						||
| 
								 | 
							
								            if(root_delay > MAX_DELAY):
							 | 
						||
| 
								 | 
							
								                root_delay = MAX_DELAY
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            wait_for = root_delay + random.uniform(0, root_delay)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                time.sleep(wait_for)
							 | 
						||
| 
								 | 
							
								            except KeyboardInterrupt:
							 | 
						||
| 
								 | 
							
								                raise
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def close_connection(self):
							 | 
						||
| 
								 | 
							
								        if self._conn is not None:
							 | 
						||
| 
								 | 
							
								            self._conn.close()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def run(self):
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            # Open connection
							 | 
						||
| 
								 | 
							
								            self.reopen_connection()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Send data in queue
							 | 
						||
| 
								 | 
							
								            while True:
							 | 
						||
| 
								 | 
							
								                # Take data from queue
							 | 
						||
| 
								 | 
							
								                data = self._queue.get(block=True)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                # Replace newlines with Unicode line separator
							 | 
						||
| 
								 | 
							
								                # for multi-line events
							 | 
						||
| 
								 | 
							
								                if not le_helpers.is_unicode(data):
							 | 
						||
| 
								 | 
							
								                    multiline = le_helpers.create_unicode(data).replace(
							 | 
						||
| 
								 | 
							
								                        '\n', LINE_SEP)
							 | 
						||
| 
								 | 
							
								                else:
							 | 
						||
| 
								 | 
							
								                    multiline = data.replace('\n', LINE_SEP)
							 | 
						||
| 
								 | 
							
								                multiline += "\n"
							 | 
						||
| 
								 | 
							
								                # Send data, reconnect if needed
							 | 
						||
| 
								 | 
							
								                while True:
							 | 
						||
| 
								 | 
							
								                    try:
							 | 
						||
| 
								 | 
							
								                        self._conn.send(multiline.encode('utf-8'))
							 | 
						||
| 
								 | 
							
								                    except socket.error:
							 | 
						||
| 
								 | 
							
								                        self.reopen_connection()
							 | 
						||
| 
								 | 
							
								                        continue
							 | 
						||
| 
								 | 
							
								                    break
							 | 
						||
| 
								 | 
							
								        except KeyboardInterrupt:
							 | 
						||
| 
								 | 
							
								            if self.verbose:
							 | 
						||
| 
								 | 
							
								                dbg("Logentries asynchronous socket client interrupted")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self.close_connection()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								SocketAppender = PlainTextSocketAppender
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    import ssl
							 | 
						||
| 
								 | 
							
								    ssl_enabled = True
							 | 
						||
| 
								 | 
							
								except ImportError:  # for systems without TLS support.
							 | 
						||
| 
								 | 
							
								    ssl_enabled = False
							 | 
						||
| 
								 | 
							
								    dbg("Unable to import ssl module. Will send over port 80.")
							 | 
						||
| 
								 | 
							
								else:
							 | 
						||
| 
								 | 
							
								    class TLSSocketAppender(PlainTextSocketAppender):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        def open_connection(self):
							 | 
						||
| 
								 | 
							
								            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
							 | 
						||
| 
								 | 
							
								            sock = ssl.wrap_socket(
							 | 
						||
| 
								 | 
							
								                sock=sock,
							 | 
						||
| 
								 | 
							
								                keyfile=None,
							 | 
						||
| 
								 | 
							
								                certfile=None,
							 | 
						||
| 
								 | 
							
								                server_side=False,
							 | 
						||
| 
								 | 
							
								                cert_reqs=ssl.CERT_REQUIRED,
							 | 
						||
| 
								 | 
							
								                ssl_version=getattr(
							 | 
						||
| 
								 | 
							
								                    ssl,
							 | 
						||
| 
								 | 
							
								                    'PROTOCOL_TLSv1_2',
							 | 
						||
| 
								 | 
							
								                    ssl.PROTOCOL_TLSv1
							 | 
						||
| 
								 | 
							
								                ),
							 | 
						||
| 
								 | 
							
								                ca_certs=certifi.where(),
							 | 
						||
| 
								 | 
							
								                do_handshake_on_connect=True,
							 | 
						||
| 
								 | 
							
								                suppress_ragged_eofs=True,
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            sock.connect((self.le_api, self.le_tls_port))
							 | 
						||
| 
								 | 
							
								            self._conn = sock
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class LogentriesHandler(logging.Handler):
							 | 
						||
| 
								 | 
							
								    def __init__(self, token, use_tls=True, verbose=True, format=None, le_api=LE_API_DEFAULT, le_port=LE_PORT_DEFAULT, le_tls_port=LE_TLS_PORT_DEFAULT):
							 | 
						||
| 
								 | 
							
								        logging.Handler.__init__(self)
							 | 
						||
| 
								 | 
							
								        self.token = token
							 | 
						||
| 
								 | 
							
								        self.good_config = True
							 | 
						||
| 
								 | 
							
								        self.verbose = verbose
							 | 
						||
| 
								 | 
							
								        # give the socket 10 seconds to flush,
							 | 
						||
| 
								 | 
							
								        # otherwise drop logs
							 | 
						||
| 
								 | 
							
								        self.timeout = 10
							 | 
						||
| 
								 | 
							
								        if not le_helpers.check_token(token):
							 | 
						||
| 
								 | 
							
								            if self.verbose:
							 | 
						||
| 
								 | 
							
								                dbg(INVALID_TOKEN)
							 | 
						||
| 
								 | 
							
								            self.good_config = False
							 | 
						||
| 
								 | 
							
								        if format is None:
							 | 
						||
| 
								 | 
							
								            format = logging.Formatter('%(asctime)s : %(levelname)s, %(message)s',
							 | 
						||
| 
								 | 
							
								                                       '%a %b %d %H:%M:%S %Z %Y')
							 | 
						||
| 
								 | 
							
								        self.setFormatter(format)
							 | 
						||
| 
								 | 
							
								        self.setLevel(logging.DEBUG)
							 | 
						||
| 
								 | 
							
								        if use_tls and ssl_enabled:
							 | 
						||
| 
								 | 
							
								            self._thread = TLSSocketAppender(verbose=verbose, le_api=le_api, le_port=le_port, le_tls_port=le_tls_port)
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            self._thread = SocketAppender(verbose=verbose, le_api=le_api, le_port=le_port, le_tls_port=le_tls_port)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def flush(self):
							 | 
						||
| 
								 | 
							
								        # wait for all queued logs to be send
							 | 
						||
| 
								 | 
							
								        now = time.time()
							 | 
						||
| 
								 | 
							
								        while not self._thread.empty():
							 | 
						||
| 
								 | 
							
								            time.sleep(0.2)
							 | 
						||
| 
								 | 
							
								            if time.time() - now > self.timeout:
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def emit_raw(self, msg):
							 | 
						||
| 
								 | 
							
								        if self.good_config and not self._thread.is_alive():
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                self._thread.start()
							 | 
						||
| 
								 | 
							
								                if self.verbose:
							 | 
						||
| 
								 | 
							
								                    dbg("Starting Logentries Asynchronous Socket Appender")
							 | 
						||
| 
								 | 
							
								            except RuntimeError: # It's already started.
							 | 
						||
| 
								 | 
							
								                pass
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        msg = self.token + msg
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            self._thread._queue.put_nowait(msg)
							 | 
						||
| 
								 | 
							
								        except Exception:
							 | 
						||
| 
								 | 
							
								            # Queue is full, try to remove the oldest message and put again
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                self._thread._queue.get_nowait()
							 | 
						||
| 
								 | 
							
								                self._thread._queue.put_nowait(msg)
							 | 
						||
| 
								 | 
							
								            except Exception:
							 | 
						||
| 
								 | 
							
								                # Race condition, no need for any action here
							 | 
						||
| 
								 | 
							
								                pass
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def emit(self, record):
							 | 
						||
| 
								 | 
							
								        msg = self.format(record).rstrip('\n')
							 | 
						||
| 
								 | 
							
								        self.emit_raw(msg)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def close(self):
							 | 
						||
| 
								 | 
							
								        logging.Handler.close(self)
							 |