'''An asynchronous multi-process `HTTP proxy server`_. It works for both
``http`` and ``https`` (tunneled) requests.
Managing Headers
=====================
It is possible to add middleware to manipulate the original request headers.
If the header middleware is
an empty list, the proxy passes requests and responses unmodified.
This is an implementation for a forward-proxy which can be used
to retrieve any type of source from the Internet.
To run the server::
python manage.py
An header middleware is a callable which receives the wsgi *environ* and
the list of request *headers*. By default the example uses:
.. autofunction:: x_forwarded_for
To run with different headers middleware create a new script and do::
from proxyserver.manage import server
if __name__ == '__main__':
server(headers_middleware=[...]).start()
Implementation
===========================
.. autoclass:: ProxyServerWsgiHandler
:members:
:member-order:
.. _`HTTP proxy server`: http://en.wikipedia.org/wiki/Proxy_server
'''
import logging
from functools import partial
import pulsar
from pulsar import HttpException, ensure_future, create_future
from pulsar.apps import wsgi, http
from pulsar.apps.http.plugins import noerror
from pulsar.utils.httpurl import Headers, ENCODE_BODY_METHODS
from pulsar.utils.log import LocalMixin, local_property
SERVER_SOFTWARE = 'Pulsar-proxy-server/%s' % pulsar.version
ENVIRON_HEADERS = ('content-type', 'content-length')
USER_AGENT = SERVER_SOFTWARE
logger = logging.getLogger('pulsar.proxyserver')
[docs]def x_forwarded_for(environ, headers):
'''Add *x-forwarded-for* header'''
headers.add_header('x-forwarded-for', environ['REMOTE_ADDR'])
[docs]class ProxyServerWsgiHandler(LocalMixin):
'''WSGI middleware for an asynchronous proxy server.
To perform processing on headers you can pass a list of
``headers_middleware``.
An headers middleware is a callable which accepts two parameters, the wsgi
*environ* dictionary and the *headers* container.
'''
def __init__(self, headers_middleware=None):
self.headers_middleware = headers_middleware or []
@local_property
def http_client(self):
'''The :class:`.HttpClient` used by this proxy middleware for
accessing upstream resources'''
client = http.HttpClient(decompress=False, store_cookies=False)
client.headers.clear()
return client
def __call__(self, environ, start_response):
uri = environ['RAW_URI']
logger.debug('new request for %r' % uri)
if not uri or uri.startswith('/'): # No proper uri, raise 404
raise HttpException(status=404)
response = TunnelResponse(self, environ, start_response)
ensure_future(response.request())
return response.future
############################################################################
# RESPONSE OBJECTS
class TunnelResponse:
'''Base WSGI Response Iterator for the Proxy server
'''
def __init__(self, wsgi, environ, start_response):
self.wsgi = wsgi
self.environ = environ
self.start_response = start_response
self.future = create_future()
async def request(self):
'''Perform the Http request to the upstream server
'''
request_headers = self.request_headers()
environ = self.environ
method = environ['REQUEST_METHOD']
data = None
if method in ENCODE_BODY_METHODS:
data = DataIterator(self)
http = self.wsgi.http_client
try:
await http.request(method,
environ['RAW_URI'],
data=data,
headers=request_headers,
version=environ['SERVER_PROTOCOL'],
pre_request=self.pre_request)
except Exception as exc:
self.error(exc)
def request_headers(self):
'''Fill request headers from the environ dictionary and
modify them via the list of :attr:`headers_middleware`.
The returned headers will be sent to the target uri.
'''
headers = Headers(kind='client')
for k in self.environ:
if k.startswith('HTTP_'):
head = k[5:].replace('_', '-')
headers[head] = self.environ[k]
for head in ENVIRON_HEADERS:
k = head.replace('-', '_').upper()
v = self.environ.get(k)
if v:
headers[head] = v
for middleware in self.wsgi.headers_middleware:
middleware(self.environ, headers)
return headers
def error(self, exc):
if self.future.done():
self.future.set_exception(exc)
else:
logger.error(str(exc))
@noerror
def pre_request(self, response, exc=None):
'''Start the tunnel.
This is a callback fired once a connection with upstream server is
established.
'''
if response.request.method == 'CONNECT':
# proxy - server connection
upstream = response.connection
# client - proxy connection
dostream = self.environ['pulsar.connection']
# Upgrade downstream connection
dostream.upgrade(partial(StreamTunnel, upstream))
# Upgrade upstream connection
upstream.upgrade(partial(StreamTunnel, dostream))
self.start_response('200 Connection established', [])
# send empty byte so that headers are sent
self.future.set_result([b''])
response.abort_request()
else:
response.bind_event('data_processed', self.data_processed)
response.bind_event('post_request', self.post_request)
def data_processed(self, response, data=None, **kw):
self.environ['pulsar.connection'].write(data)
def post_request(self, _, exc=None):
self.future.set_exception(wsgi.AbortWsgi())
class DataIterator:
def __init__(self, response):
self.response = response
self.stream = response.environ.get('wsgi.input')
def __iter__(self):
yield self.stream.reader.read()
class StreamTunnel(pulsar.ProtocolConsumer):
''':class:`.ProtocolConsumer` handling encrypted messages from
downstream client and upstream server.
This consumer is created as an upgrade of the standard Http protocol
consumer.
.. attribute:: tunnel
Connection to the downstream client or upstream server.
'''
headers = None
status_code = None
http_request = None
def __init__(self, tunnel, loop=None):
super().__init__(loop)
self.tunnel = tunnel
def connection_made(self, connection):
self.logger.debug('Tunnel connection %s made', connection)
connection.bind_event('connection_lost', self._close_tunnel)
if self.http_request:
self.start(self.http_request)
def data_received(self, data):
try:
return self.tunnel.write(data)
except Exception:
if not self.tunnel.closed:
raise
def _close_tunnel(self, arg, exc=None):
if not self.tunnel.closed:
self._loop.call_soon(self.tunnel.close)
def server(name='proxy-server', headers_middleware=None,
server_software=None, **kwargs):
'''Function to Create a WSGI Proxy Server.'''
if headers_middleware is None:
headers_middleware = [x_forwarded_for]
wsgi_proxy = ProxyServerWsgiHandler(headers_middleware)
kwargs['server_software'] = server_software or SERVER_SOFTWARE
return wsgi.WSGIServer(wsgi_proxy, name=name, **kwargs)
if __name__ == '__main__':
server().start()