You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							367 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							367 lines
						
					
					
						
							12 KiB
						
					
					
				| # -*- coding: utf-8 -
 | |
| #
 | |
| # This file is part of gunicorn released under the MIT license.
 | |
| # See the NOTICE for more information.
 | |
| 
 | |
| # design:
 | |
| # a threaded worker accepts connections in the main loop, accepted
 | |
| # connections are are added to the thread pool as a connection job. On
 | |
| # keepalive connections are put back in the loop waiting for an event.
 | |
| # If no event happen after the keep alive timeout, the connectoin is
 | |
| # closed.
 | |
| 
 | |
| from collections import deque
 | |
| from datetime import datetime
 | |
| import errno
 | |
| from functools import partial
 | |
| import os
 | |
| import socket
 | |
| import ssl
 | |
| import sys
 | |
| from threading import RLock
 | |
| import time
 | |
| 
 | |
| from .. import http
 | |
| from ..http import wsgi
 | |
| from .. import util
 | |
| from . import base
 | |
| from .. import six
 | |
| 
 | |
| 
 | |
| try:
 | |
|     import concurrent.futures as futures
 | |
| except ImportError:
 | |
|     raise RuntimeError("""
 | |
|     You need to install the 'futures' package to use this worker with this
 | |
|     Python version.
 | |
|     """)
 | |
| 
 | |
| try:
 | |
|     from asyncio import selectors
 | |
| except ImportError:
 | |
|     from gunicorn import selectors
 | |
| 
 | |
| 
 | |
| class TConn(object):
 | |
| 
 | |
|     def __init__(self, cfg, sock, client, server):
 | |
|         self.cfg = cfg
 | |
|         self.sock = sock
 | |
|         self.client = client
 | |
|         self.server = server
 | |
| 
 | |
|         self.timeout = None
 | |
|         self.parser = None
 | |
| 
 | |
|         # set the socket to non blocking
 | |
|         self.sock.setblocking(False)
 | |
| 
 | |
|     def init(self):
 | |
|         self.sock.setblocking(True)
 | |
|         if self.parser is None:
 | |
|             # wrap the socket if needed
 | |
|             if self.cfg.is_ssl:
 | |
|                 self.sock = ssl.wrap_socket(self.sock, server_side=True,
 | |
|                         **self.cfg.ssl_options)
 | |
| 
 | |
|             # initialize the parser
 | |
|             self.parser = http.RequestParser(self.cfg, self.sock)
 | |
| 
 | |
|     def set_timeout(self):
 | |
|         # set the timeout
 | |
|         self.timeout = time.time() + self.cfg.keepalive
 | |
| 
 | |
|     def close(self):
 | |
|         util.close(self.sock)
 | |
| 
 | |
| 
 | |
| class ThreadWorker(base.Worker):
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super(ThreadWorker, self).__init__(*args, **kwargs)
 | |
|         self.worker_connections = self.cfg.worker_connections
 | |
|         self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
 | |
|         # initialise the pool
 | |
|         self.tpool = None
 | |
|         self.poller = None
 | |
|         self._lock = None
 | |
|         self.futures = deque()
 | |
|         self._keep = deque()
 | |
|         self.nr_conns = 0
 | |
| 
 | |
|     @classmethod
 | |
|     def check_config(cls, cfg, log):
 | |
|         max_keepalived = cfg.worker_connections - cfg.threads
 | |
| 
 | |
|         if max_keepalived <= 0 and cfg.keepalive:
 | |
|             log.warning("No keepalived connections can be handled. " +
 | |
|                     "Check the number of worker connections and threads.")
 | |
| 
 | |
|     def init_process(self):
 | |
|         self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
 | |
|         self.poller = selectors.DefaultSelector()
 | |
|         self._lock = RLock()
 | |
|         super(ThreadWorker, self).init_process()
 | |
| 
 | |
|     def handle_quit(self, sig, frame):
 | |
|         self.alive = False
 | |
|         # worker_int callback
 | |
|         self.cfg.worker_int(self)
 | |
|         self.tpool.shutdown(False)
 | |
|         time.sleep(0.1)
 | |
|         sys.exit(0)
 | |
| 
 | |
|     def _wrap_future(self, fs, conn):
 | |
|         fs.conn = conn
 | |
|         self.futures.append(fs)
 | |
|         fs.add_done_callback(self.finish_request)
 | |
| 
 | |
|     def enqueue_req(self, conn):
 | |
|         conn.init()
 | |
|         # submit the connection to a worker
 | |
|         fs = self.tpool.submit(self.handle, conn)
 | |
|         self._wrap_future(fs, conn)
 | |
| 
 | |
|     def accept(self, server, listener):
 | |
|         try:
 | |
|             sock, client = listener.accept()
 | |
|             # initialize the connection object
 | |
|             conn = TConn(self.cfg, sock, client, server)
 | |
|             self.nr_conns += 1
 | |
|             # enqueue the job
 | |
|             self.enqueue_req(conn)
 | |
|         except EnvironmentError as e:
 | |
|             if e.errno not in (errno.EAGAIN,
 | |
|                     errno.ECONNABORTED, errno.EWOULDBLOCK):
 | |
|                 raise
 | |
| 
 | |
|     def reuse_connection(self, conn, client):
 | |
|         with self._lock:
 | |
|             # unregister the client from the poller
 | |
|             self.poller.unregister(client)
 | |
|             # remove the connection from keepalive
 | |
|             try:
 | |
|                 self._keep.remove(conn)
 | |
|             except ValueError:
 | |
|                 # race condition
 | |
|                 return
 | |
| 
 | |
|         # submit the connection to a worker
 | |
|         self.enqueue_req(conn)
 | |
| 
 | |
|     def murder_keepalived(self):
 | |
|         now = time.time()
 | |
|         while True:
 | |
|             with self._lock:
 | |
|                 try:
 | |
|                     # remove the connection from the queue
 | |
|                     conn = self._keep.popleft()
 | |
|                 except IndexError:
 | |
|                     break
 | |
| 
 | |
|             delta = conn.timeout - now
 | |
|             if delta > 0:
 | |
|                 # add the connection back to the queue
 | |
|                 with self._lock:
 | |
|                     self._keep.appendleft(conn)
 | |
|                 break
 | |
|             else:
 | |
|                 self.nr_conns -= 1
 | |
|                 # remove the socket from the poller
 | |
|                 with self._lock:
 | |
|                     try:
 | |
|                         self.poller.unregister(conn.sock)
 | |
|                     except EnvironmentError as e:
 | |
|                         if e.errno != errno.EBADF:
 | |
|                             raise
 | |
|                     except KeyError:
 | |
|                         # already removed by the system, continue
 | |
|                         pass
 | |
| 
 | |
|                 # close the socket
 | |
|                 conn.close()
 | |
| 
 | |
|     def is_parent_alive(self):
 | |
|         # If our parent changed then we shut down.
 | |
|         if self.ppid != os.getppid():
 | |
|             self.log.info("Parent changed, shutting down: %s", self)
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def run(self):
 | |
|         # init listeners, add them to the event loop
 | |
|         for sock in self.sockets:
 | |
|             sock.setblocking(False)
 | |
|             # a race condition during graceful shutdown may make the listener
 | |
|             # name unavailable in the request handler so capture it once here
 | |
|             server = sock.getsockname()
 | |
|             acceptor = partial(self.accept, server)
 | |
|             self.poller.register(sock, selectors.EVENT_READ, acceptor)
 | |
| 
 | |
|         while self.alive:
 | |
|             # notify the arbiter we are alive
 | |
|             self.notify()
 | |
| 
 | |
|             # can we accept more connections?
 | |
|             if self.nr_conns < self.worker_connections:
 | |
|                 # wait for an event
 | |
|                 events = self.poller.select(1.0)
 | |
|                 for key, _ in events:
 | |
|                     callback = key.data
 | |
|                     callback(key.fileobj)
 | |
| 
 | |
|                 # check (but do not wait) for finished requests
 | |
|                 result = futures.wait(self.futures, timeout=0,
 | |
|                         return_when=futures.FIRST_COMPLETED)
 | |
|             else:
 | |
|                 # wait for a request to finish
 | |
|                 result = futures.wait(self.futures, timeout=1.0,
 | |
|                         return_when=futures.FIRST_COMPLETED)
 | |
| 
 | |
|             # clean up finished requests
 | |
|             for fut in result.done:
 | |
|                 self.futures.remove(fut)
 | |
| 
 | |
|             if not self.is_parent_alive():
 | |
|                 break
 | |
| 
 | |
|             # hanle keepalive timeouts
 | |
|             self.murder_keepalived()
 | |
| 
 | |
|         self.tpool.shutdown(False)
 | |
|         self.poller.close()
 | |
| 
 | |
|         for s in self.sockets:
 | |
|             s.close()
 | |
| 
 | |
|         futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
 | |
| 
 | |
|     def finish_request(self, fs):
 | |
|         if fs.cancelled():
 | |
|             self.nr_conns -= 1
 | |
|             fs.conn.close()
 | |
|             return
 | |
| 
 | |
|         try:
 | |
|             (keepalive, conn) = fs.result()
 | |
|             # if the connection should be kept alived add it
 | |
|             # to the eventloop and record it
 | |
|             if keepalive:
 | |
|                 # flag the socket as non blocked
 | |
|                 conn.sock.setblocking(False)
 | |
| 
 | |
|                 # register the connection
 | |
|                 conn.set_timeout()
 | |
|                 with self._lock:
 | |
|                     self._keep.append(conn)
 | |
| 
 | |
|                     # add the socket to the event loop
 | |
|                     self.poller.register(conn.sock, selectors.EVENT_READ,
 | |
|                             partial(self.reuse_connection, conn))
 | |
|             else:
 | |
|                 self.nr_conns -= 1
 | |
|                 conn.close()
 | |
|         except:
 | |
|             # an exception happened, make sure to close the
 | |
|             # socket.
 | |
|             self.nr_conns -= 1
 | |
|             fs.conn.close()
 | |
| 
 | |
|     def handle(self, conn):
 | |
|         keepalive = False
 | |
|         req = None
 | |
|         try:
 | |
|             req = six.next(conn.parser)
 | |
|             if not req:
 | |
|                 return (False, conn)
 | |
| 
 | |
|             # handle the request
 | |
|             keepalive = self.handle_request(req, conn)
 | |
|             if keepalive:
 | |
|                 return (keepalive, conn)
 | |
|         except http.errors.NoMoreData as e:
 | |
|             self.log.debug("Ignored premature client disconnection. %s", e)
 | |
| 
 | |
|         except StopIteration as e:
 | |
|             self.log.debug("Closing connection. %s", e)
 | |
|         except ssl.SSLError as e:
 | |
|             if e.args[0] == ssl.SSL_ERROR_EOF:
 | |
|                 self.log.debug("ssl connection closed")
 | |
|                 conn.sock.close()
 | |
|             else:
 | |
|                 self.log.debug("Error processing SSL request.")
 | |
|                 self.handle_error(req, conn.sock, conn.client, e)
 | |
| 
 | |
|         except EnvironmentError as e:
 | |
|             if e.errno not in (errno.EPIPE, errno.ECONNRESET):
 | |
|                 self.log.exception("Socket error processing request.")
 | |
|             else:
 | |
|                 if e.errno == errno.ECONNRESET:
 | |
|                     self.log.debug("Ignoring connection reset")
 | |
|                 else:
 | |
|                     self.log.debug("Ignoring connection epipe")
 | |
|         except Exception as e:
 | |
|             self.handle_error(req, conn.sock, conn.client, e)
 | |
| 
 | |
|         return (False, conn)
 | |
| 
 | |
|     def handle_request(self, req, conn):
 | |
|         environ = {}
 | |
|         resp = None
 | |
|         try:
 | |
|             self.cfg.pre_request(self, req)
 | |
|             request_start = datetime.now()
 | |
|             resp, environ = wsgi.create(req, conn.sock, conn.client,
 | |
|                     conn.server, self.cfg)
 | |
|             environ["wsgi.multithread"] = True
 | |
|             self.nr += 1
 | |
|             if self.alive and self.nr >= self.max_requests:
 | |
|                 self.log.info("Autorestarting worker after current request.")
 | |
|                 resp.force_close()
 | |
|                 self.alive = False
 | |
| 
 | |
|             if not self.cfg.keepalive:
 | |
|                 resp.force_close()
 | |
|             elif len(self._keep) >= self.max_keepalived:
 | |
|                 resp.force_close()
 | |
| 
 | |
|             respiter = self.wsgi(environ, resp.start_response)
 | |
|             try:
 | |
|                 if isinstance(respiter, environ['wsgi.file_wrapper']):
 | |
|                     resp.write_file(respiter)
 | |
|                 else:
 | |
|                     for item in respiter:
 | |
|                         resp.write(item)
 | |
| 
 | |
|                 resp.close()
 | |
|                 request_time = datetime.now() - request_start
 | |
|                 self.log.access(resp, req, environ, request_time)
 | |
|             finally:
 | |
|                 if hasattr(respiter, "close"):
 | |
|                     respiter.close()
 | |
| 
 | |
|             if resp.should_close():
 | |
|                 self.log.debug("Closing connection.")
 | |
|                 return False
 | |
|         except EnvironmentError:
 | |
|             # pass to next try-except level
 | |
|             six.reraise(*sys.exc_info())
 | |
|         except Exception:
 | |
|             if resp and resp.headers_sent:
 | |
|                 # If the requests have already been sent, we should close the
 | |
|                 # connection to indicate the error.
 | |
|                 self.log.exception("Error handling request")
 | |
|                 try:
 | |
|                     conn.sock.shutdown(socket.SHUT_RDWR)
 | |
|                     conn.sock.close()
 | |
|                 except EnvironmentError:
 | |
|                     pass
 | |
|                 raise StopIteration()
 | |
|             raise
 | |
|         finally:
 | |
|             try:
 | |
|                 self.cfg.post_request(self, req, environ, resp)
 | |
|             except Exception:
 | |
|                 self.log.exception("Exception in post_request hook")
 | |
| 
 | |
|         return True
 | |
| 
 |