Source code for examples.echoudp.manage
"""
This example illustrates how to write a UDP Echo server and client pair.
The code for this example is located in the :mod:`examples.echoudp.manage`
module.
Run The example
====================
To run the server::
python manage.py
Open a new shell, in this directory, launch python and type::
>>> from manage import Echo
>>> echo = Echo(('localhost', 8060))
>>> echo(b'Hello!\\n')
b'Hello!'
Writing the Client
=========================
The first step is to write a small class handling a connection
pool with the remote server. The :class:`Echo` class does just that,
it subclass the handy :class:`.AbstractUdpClient` and uses
the asynchronous :class:`.Pool` of connections as backbone.
The second step is the implementation of the :class:`EchoUdpProtocol`,
a subclass of :class:`.DatagramProtocol`.
The :class:`EchoUdpProtocol` is needed for two reasons:
* It encodes and sends the request to the remote server
* It listens for incoming data from the remote server via the
:meth:`~EchoUdpProtocol.datagram_received` method.
Implementation
==================
Echo UDP Protocol
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: EchoUdpProtocol
:members:
:member-order: bysource
Echo Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: Echo
:members:
:member-order: bysource
.. automethod:: __call__
Echo Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: server
"""
import pulsar
from pulsar import Pool, create_future, DatagramProtocol
from pulsar.utils.pep import to_bytes
from pulsar.apps.socket import UdpSocketServer
[docs]class EchoUdpProtocol(DatagramProtocol):
'''A base :class:`.DatagramProtocol` for UDP echo clients and servers.
The only difference between client and server is the implementation
of the :meth:`response` method.
'''
separator = b'\n'
'''A separator for messages.'''
buffer = None
'''The buffer for long messages'''
[docs] def datagram_received(self, data, addr):
'''Handle data from ``addr``.
'''
if self.buffer and addr in self.buffer:
data = self.buffer.pop(addr) + data
while data:
idx = data.find(self.separator)
if idx >= 0: # we have a full message
idx += len(self.separator)
chunk, data = data[:idx], data[idx:]
self.response(chunk, addr)
else:
if self.buffer is None:
self.buffer = {}
self.buffer[addr] = data
data = None
[docs] def response(self, data, addr):
'''Abstract response handler'''
raise NotImplementedError
class EchoUdpClientProtocol(EchoUdpProtocol):
_waiting = None
def send(self, message):
assert self._waiting is None
self._waiting = d = create_future(self._loop)
self._transport.sendto(to_bytes(message)+self.separator)
return d
def response(self, data, addr):
'''Got a full response'''
d, self._waiting = self._waiting, None
if d:
d.set_result(data[:-len(self.separator)])
class EchoUdpServerProtocol(EchoUdpProtocol):
'''The :class:`EchoUdpProtocol` used by the echo udp :func:`server`.
'''
def response(self, data, addr):
'''Override :meth:`~EchoProtocol.response` method by writing the
``data`` received back to the client.
'''
self._transport.sendto(data, addr)
[docs]class Echo(pulsar.AbstractUdpClient):
'''A client for the echo server.
:param address: set the :attr:`address` attribute
:param pool_size: used when initialising the connection :attr:`pool`.
:param loop: Optional event loop to set the :attr:`_loop` attribute.
.. attribute:: _loop
The event loop used by the client IO requests.
The event loop is stored at this attribute so that asynchronous
method decorators such as :func:`.task` can be used.
.. attribute:: address
remote server UDP address.
.. attribute:: pool
Asynchronous client protocol :class:`.Pool`.
'''
protocol_factory = EchoUdpClientProtocol
def __init__(self, address, pool_size=5, loop=None):
super().__init__(loop)
self.address = address
self.pool = Pool(self.create_endpoint, pool_size, self._loop)
def create_endpoint(self):
return self.create_datagram_endpoint(remote_addr=self.address)
[docs] def __call__(self, message):
'''Send a ``message`` to the server and wait for a response.
:return: a :class:`.Future`
'''
result = self._call(message)
if not self._loop.is_running():
return self._loop.run_until_complete(result)
else:
return result
async def _call(self, message):
protocol = await self.pool.connect()
with protocol:
result = await protocol.send(message)
return result
[docs]def server(name=None, description=None, **kwargs):
'''Create the :class:`.UdpSocketServer`.
'''
name = name or 'echoudpserver'
description = description or 'Echo Udp Server'
return UdpSocketServer(EchoUdpServerProtocol, name=name,
description=description, **kwargs)
if __name__ == '__main__': # pragma nocover
server().start()