Source code for examples.echo.manage
'''
This example illustrates how to write a simple TCP Echo server and client pair.
The example is simple because the client and server protocols are symmetrical
and therefore the :class:`EchoProtocol` will also be used as based class for
:class:`EchoServerProtocol`.
The code for this example is located in the :mod:`examples.echo.manage`
module.
Run The example
====================
To run the server::
python manage.py
Open a new shell, in this directory, launch python and type::
>>> from manage import Echo
>>> echo = Echo(('localhost',8060))
>>> echo(b'Hello!')
b'Hello!'
Writing the Client
=========================
The first step is to write a small class handling a connection
pool with the remote server. The :class:`Echo` class does just that,
it subclasses the handy :class:`.AbstractClient` and uses
the asynchronous :class:`.Pool` of connections as backbone.
The second step is the implementation of the :class:`.EchoProtocol`,
a subclass of :class:`.ProtocolConsumer`.
The :class:`EchoProtocol` is needed for two reasons:
* It encodes and sends the request to the remote server via the
:meth:`~EchoProtocol.start_request` method.
* It listens for incoming data from the remote server via the
:meth:`~EchoProtocol.data_received` method.
To wait for the response message one can ``await`` from the
:attr:`.ProtocolConsumer.on_finished` event.
Implementation
==================
Echo Client Protocol
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: EchoProtocol
:members:
:member-order: bysource
Echo Server Protocol
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: EchoServerProtocol
:members:
:member-order: bysource
Echo Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: Echo
:members:
:member-order: bysource
.. automethod:: __call__
Echo Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: server
'''
from functools import partial
import pulsar
from pulsar import Pool, Connection, AbstractClient, ProtocolError
from pulsar.apps.socket import SocketServer
[docs]class EchoProtocol(pulsar.ProtocolConsumer):
'''An echo :class:`~.ProtocolConsumer` for client and servers.
The only difference between client and server is the implementation
of the :meth:`response` method.
'''
separator = b'\r\n\r\n'
'''A separator for messages.'''
buffer = b''
'''The buffer for long messages'''
[docs] def data_received(self, data):
'''Implements the :meth:`~.ProtocolConsumer.data_received` method.
It simply search for the :attr:`separator` and, if found, it invokes
the :meth:`response` method with the value of the message.
'''
if self.buffer:
data = self.buffer + data
idx = data.find(self.separator)
if idx >= 0: # we have a full message
idx += len(self.separator)
data, rest = data[:idx], data[idx:]
self.buffer = self.response(data, rest)
self.finished()
return rest
else:
self.buffer = data
[docs] def start_request(self):
'''Override :meth:`~.ProtocolConsumer.start_request` to write
the message ended by the :attr:`separator` into the transport.
'''
self.transport.write(self._request + self.separator)
[docs] def response(self, data, rest):
'''Clients return the message so that the
:attr:`.ProtocolConsumer.on_finished` is called back with the
message value, while servers sends the message back to the client.
'''
if rest:
raise ProtocolError
return data[:-len(self.separator)]
[docs]class EchoServerProtocol(EchoProtocol):
'''The :class:`EchoProtocol` used by the echo :func:`server`.
'''
[docs] def response(self, data, rest):
'''Override :meth:`~EchoProtocol.response` method by writing the
``data`` received back to the client.
'''
self.transport.write(data)
data = data[:-len(self.separator)]
# If we get a QUIT message, close the transport.
# Used by the test suite.
if data == b'QUIT':
self.transport.close()
return data
[docs]class Echo(AbstractClient):
'''A client for the echo server.
:param address: set the :attr:`address` attribute
:param full_response: set the :attr:`full_response` attribute
:param pool_size: used when initialising the connection :attr:`pool`.
:param loop: Optional event loop to set the :attr:`_loop` attribute.
.. attribute:: _loop
The event loop used by the client IO requests.
The event loop is stored at this attribute so that asynchronous
method decorators such as :func:`.task` can be used.
.. attribute:: address
remote server TCP address.
.. attribute:: pool
Asynchronous connection :class:`.Pool`.
.. attribute:: full_response
Flag indicating if the callable method should return the
:class:`EchoProtocol` handling the request (``True``) or
the server response message (``False``).
Default: ``False``
'''
protocol_factory = partial(Connection, EchoProtocol)
def __init__(self, address, full_response=False, pool_size=10, loop=None):
super().__init__(loop)
self.address = address
self.full_response = full_response
self.pool = Pool(self.connect, pool_size, self._loop)
def connect(self):
return self.create_connection(self.address)
[docs] def __call__(self, message):
'''Send a ``message`` to the server and wait for a response.
:return: a :class:`.Future`
'''
result = self._call(message)
if not self._loop.is_running():
return self._loop.run_until_complete(result)
else:
return result
async def _call(self, message):
connection = await self.pool.connect()
with connection:
consumer = connection.current_consumer()
consumer.start(message)
await consumer.on_finished
return consumer if self.full_response else consumer.buffer
[docs]def server(name=None, description=None, **kwargs):
'''Create the :class:`.SocketServer` with :class:`EchoServerProtocol`
as protocol factory.
'''
name = name or 'echoserver'
description = description or 'Echo Server'
return SocketServer(EchoServerProtocol, name=name,
description=description, **kwargs)
def log_connection(connection, exc=None):
if not exc:
connection.logger.info('Got a new connection!')
if __name__ == '__main__': # pragma nocover
server(connection_made=log_connection).start()