Source code for pulsar.apps.socket

'''Asynchronous application for serving requests
on sockets. This is the base class of :class:`.WSGIServer`.
All is needed by a :class:`.SocketServer` application is a callable
which build a :class:`.ProtocolConsumer` for each new client request
received.
This is an example of a script for an Echo server::

    import pulsar
    from pulsar.apps.socket import SocketServer

    class EchoServerProtocol(pulsar.ProtocolConsumer):
        ...

    if __name__ == '__main__':
        SocketServer(EchoServerProtocol).start()

Check the :ref:`echo server example <tutorials-writing-clients>` for detailed
implementation of the ``EchoServerProtocol`` class.

.. _socket-server-settings:

Socket Server Settings
==============================
All standard :ref:`application settings <settings>` can be applied to a
:class:`SocketServer`. In addition, the following are
specific to sockets and can be used to fine tune your application:

bind
------
To specify the address to bind the server to::

    python script.py --bind 127.0.0.1:8070

This will accept connection from the 127.0.0.1 network interface and port 8070.
This means pulsar will be able to accept connections only from clients
running into the same computer it is running.

On the other hand, it is possible to listen for connections from all
the network interfaces available on the server by specifying ``:<port>``.
For example, this will listen for both ipv4 and ipv6 sockets **on all hosts**
on port 8080::

    python script.py --bind :8080

**Use this notation when running pulsar inside Docker or any other container**.

You can bind to a random available port by specifying 0 as the port number::

    python script.py --bind :0

useful during testing.


backlog
---------
To specify the maximum number of queued connections you can use the
:ref:`backlog <setting-backlog>` settings. For example::

    python script.py --backlog 1000

rarely used.

keep_alive
---------------
To control how long a server :class:`.Connection` is kept alive after the
last read from the remote client, one can use the
:ref:`keep-alive <setting-keep_alive>` setting::

    python script.py --keep-alive 10

will close client connections which have been idle for 10 seconds.

.. _socket-server-ssl:

TLS/SSL support
------------------------
Transport Layer Security (often known as Secure Sockets Layer) is handled by
the :ref:`cert-file <setting-cert_file>` and :ref:`key-file <setting-key_file>`
settings::

    python script.py --cert-file server.crt --key-file server.key


.. _socket-server-concurrency:

Concurrency
==================

When running a :class:`SocketServer` in multi-process mode (default),
the application, create a listening socket in the parent (Arbiter) process
and then spawn several process-based actors which listen on the
same shared socket.
This is how pre-forking servers operate.

When running a :class:`SocketServer` in threading mode::

    python script.py --concurrency thread

the number of :class:`.Actor` serving the application is set
to ``0`` so that the application is actually served by the
arbiter event-loop (we refer this to a single process server).
This configuration is used when debugging, testing, benchmarking or on small
load servers.

In addition, a :class:`SocketServer` in multi-process mode is only available
for:

* Posix systems.
* Windows running python 3.2 or above (python 2 on windows does not support
  the creation of sockets from file descriptors).

Check the :meth:`SocketServer.monitor_start` method for implementation details.
'''
import os
import socket
from math import log
from random import lognormvariate
from functools import partial
import asyncio
try:
    import ssl
except ImportError:     # pragma    nocover
    ssl = None

import pulsar
from pulsar import TcpServer, DatagramServer, Connection, ImproperlyConfigured
from pulsar import as_coroutine
from pulsar.utils.internet import parse_address
from pulsar.utils.config import pass_through


class SocketSetting(pulsar.Setting):
    virtual = True
    app = 'socket'
    section = "Socket Servers"


class Bind(SocketSetting):
    name = "bind"
    flags = ["-b", "--bind"]
    meta = "ADDRESS"
    default = "127.0.0.1:{0}".format(pulsar.DEFAULT_PORT)
    desc = """\
        The socket to bind.

        A string of the form: ``HOST``, ``HOST:PORT``, ``unix:PATH``.
        An IP is a valid HOST. Specify ``:PORT`` to listen for connections
        from all the network interfaces available on the server.
        """


class KeepAlive(SocketSetting):
    name = "keep_alive"
    flags = ["--keep-alive"]
    validator = pulsar.validate_pos_int
    type = int
    default = 15
    desc = """\
        The number of seconds to keep an idle client connection
        open."""


class Backlog(SocketSetting):
    name = "backlog"
    flags = ["--backlog"]
    validator = pulsar.validate_pos_int
    type = int
    default = 2048
    desc = """\
        The maximum number of queued connections in a socket.

        This refers to the number of clients that can be waiting to be served.
        Exceeding this number results in the client getting an error when
        attempting to connect. It should only affect servers under significant
        load.
        Must be a positive integer. Generally set in the 64-2048 range.
        """


class KeyFile(SocketSetting):
    name = "key_file"
    flags = ["--key-file"]
    meta = "FILE"
    default = None
    desc = """\
    SSL key file
    """


class CertFile(SocketSetting):
    name = "cert_file"
    flags = ["--cert-file"]
    meta = "FILE"
    default = None
    desc = """\
    SSL certificate file
    """


[docs]class SocketServer(pulsar.Application): '''A :class:`.Application` which serve application on a socket. It bind a socket to a given address and listen for requests. The request handler is constructed from the callable passed during initialisation. .. attribute:: address The socket address, available once the application has started. ''' name = 'socket' cfg = pulsar.Config(apps=['socket'])
[docs] def protocol_factory(self): '''Factory of :class:`.ProtocolConsumer` used by the server. By default it returns the :meth:`.Application.callable`. ''' return partial(Connection, self.cfg.callable)
[docs] async def monitor_start(self, monitor): '''Create the socket listening to the ``bind`` address. If the platform does not support multiprocessing sockets set the number of workers to 0. ''' cfg = self.cfg loop = monitor._loop if (not pulsar.platform.has_multiProcessSocket or cfg.concurrency == 'thread'): cfg.set('workers', 0) if not cfg.address: raise ImproperlyConfigured('Could not open a socket. ' 'No address to bind to') address = parse_address(self.cfg.address) if cfg.cert_file or cfg.key_file: if not ssl: raise RuntimeError('No support for ssl') if cfg.cert_file and not os.path.exists(cfg.cert_file): raise ImproperlyConfigured('cert_file "%s" does not exist' % cfg.cert_file) if cfg.key_file and not os.path.exists(cfg.key_file): raise ImproperlyConfigured('key_file "%s" does not exist' % cfg.key_file) # First create the sockets try: server = await loop.create_server(asyncio.Protocol, *address) except socket.error as e: raise ImproperlyConfigured(e) else: self.monitor_sockets(monitor, server.sockets)
def monitor_sockets(self, monitor, sockets): addresses = [] loop = monitor._loop for sock in sockets: addresses.append(sock.getsockname()) fd = sock.fileno() loop.remove_reader(fd) monitor.sockets = sockets self.cfg.addresses = addresses def actorparams(self, monitor, params): params['sockets'] = monitor.sockets
[docs] async def worker_start(self, worker, exc=None): '''Start the worker by invoking the :meth:`create_server` method. ''' if not exc: server = await self.create_server(worker) server.bind_event('stop', lambda _, **kw: worker.stop()) worker.servers[self.name] = server
async def worker_stopping(self, worker, exc=None): server = worker.servers.get(self.name) if server: await server.close() close = getattr(self.cfg.callable, 'close', None) if hasattr(close, '__call__'): try: await as_coroutine(close()) except Exception: pass def worker_info(self, worker, info): server = worker.servers.get(self.name) if server: info['%sserver' % self.name] = server.info() return info
[docs] def server_factory(self, *args, **kw): '''Create a :class:`.TcpServer`. ''' return TcpServer(*args, **kw)
# INTERNALS
[docs] async def create_server(self, worker): '''Create the Server which will listen for requests. :return: a :class:`.TcpServer`. ''' sockets = worker.sockets cfg = self.cfg max_requests = cfg.max_requests if max_requests: max_requests = int(lognormvariate(log(max_requests), 0.2)) server = self.server_factory(self.protocol_factory(), worker._loop, sockets=sockets, max_requests=max_requests, keep_alive=cfg.keep_alive, name=self.name, logger=self.logger) for event in ('connection_made', 'pre_request', 'post_request', 'connection_lost'): callback = getattr(cfg, event) if callback != pass_through: server.bind_event(event, callback) await server.start_serving(cfg.backlog, sslcontext=self.sslcontext()) return server
def sslcontext(self): cfg = self.cfg if cfg.cert_file and cfg.key_file: ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain(certfile=cfg.cert_file, keyfile=cfg.key_file) return ctx
[docs]class UdpSocketServer(SocketServer): '''A :class:`.SocketServer` which serves application on a UDP sockets. It binds a socket to a given address and listen for requests. The request handler is constructed from the callable passed during initialisation. .. attribute:: address The socket address, available once the application has started. ''' name = 'udpsocket' cfg = pulsar.Config(apps=['socket'])
[docs] def protocol_factory(self): '''Return the :class:`.DatagramProtocol` factory. ''' return self.cfg.callable
[docs] async def monitor_start(self, monitor): '''Create the socket listening to the ``bind`` address. If the platform does not support multiprocessing sockets set the number of workers to 0. ''' cfg = self.cfg loop = monitor._loop if (not pulsar.platform.has_multiProcessSocket or cfg.concurrency == 'thread'): cfg.set('workers', 0) if not cfg.address: raise pulsar.ImproperlyConfigured('Could not open a socket. ' 'No address to bind to') address = parse_address(self.cfg.address) # First create the sockets transport, _ = await loop.create_datagram_endpoint( asyncio.DatagramProtocol, address) sock = transport.get_extra_info('socket') transport._sock = DummySock() transport.close() self.monitor_sockets(monitor, [sock])
def actorparams(self, monitor, params): params.update({'sockets': monitor.sockets})
[docs] def server_factory(self, *args, **kw): '''By default returns a new :class:`.DatagramServer`. ''' return DatagramServer(*args, **kw)
# INTERNALS
[docs] async def create_server(self, worker): '''Create the Server which will listen for requests. :return: the server obtained from :meth:`server_factory`. ''' cfg = self.cfg max_requests = cfg.max_requests if max_requests: max_requests = int(lognormvariate(log(max_requests), 0.2)) server = self.server_factory(self.protocol_factory(), worker._loop, sockets=worker.sockets, max_requests=max_requests, name=self.name, logger=self.logger) server.bind_event('stop', lambda _, **kw: worker.stop()) for event in ('pre_request', 'post_request'): callback = getattr(cfg, event) if callback != pass_through: server.bind_event(event, callback) await server.create_endpoint() return server
class DummySock: def close(self): pass