Protocols/Transports API¶
This part of the pulsar API is about classes responsible for
implementing the Protocol/Transport paradigm. They are based on
asyncio.Protocol
and asyncio.DatagramProtocol
classes.
Protocols¶
PulsarProtocol¶
-
class
pulsar.async.protocols.
PulsarProtocol
(loop, session=1, producer=None, logger=None, **kw)[source]¶ A mixin class for both
Protocol
andDatagramProtocol
.A
PulsarProtocol
is anEventHandler
which has two one time events:connection_made
connection_lost
-
close
()[source]¶ Close by closing the
transport
Return the
connection_lost
event which can be used to wait for complete transport closure.
-
connection_made
(transport)[source]¶ Sets the
transport
, fire theconnection_made
event and adds atimeout
for idle connections.
-
session
¶ Connection session number.
Passed during initialisation by the
producer
. Usually an integer representing the number of separate connections the producer has processed at the time it created thisProtocol
.
-
transport
¶ The transport for this protocol.
Available once the
connection_made()
is called.
Protocol¶
Connection¶
-
class
pulsar.async.protocols.
Connection
(consumer_factory=None, timeout=None, **kw)[source]¶ A
FlowControl
to handle multiple TCP requests/responses.It is a class which acts as bridge between a transport and a
ProtocolConsumer
. It routes data arriving from the transport to thecurrent_consumer()
.-
_consumer_factory
¶ A factory of
ProtocolConsumer
.
-
_processed
¶ number of separate requests processed.
-
current_consumer
()[source]¶ The
ProtocolConsumer
currently handling incoming data.This instance will receive data when this connection get data from the
transport
via thedata_received()
method.If no consumer is available, build a new one and return it.
-
data_received
(data)[source]¶ Delegates handling of data to the
current_consumer()
.Once done set a timeout for idle connections when a
timeout
is a positive number (of seconds).
-
upgrade
(consumer_factory)[source]¶ Upgrade the
_consumer_factory()
callable.This method can be used when the protocol specification changes during a response (an example is a WebSocket request/response, or HTTP tunneling).
This method adds a
post_request
callback to thecurrent_consumer()
to build a new consumer with the new_consumer_factory()
.Parameters: consumer_factory – the new consumer factory (a callable accepting no parameters) Returns: None
.
-
Protocol Consumer¶
-
class
pulsar.async.protocols.
ProtocolConsumer
(loop=None, one_time_events=None, many_times_events=None)[source]¶ The consumer of data for a server or client
Connection
.It is responsible for receiving incoming data from an end point via the
Connection.data_received()
method, decoding (parsing) and, possibly, writing back to the client or server via thetransport
attribute.Note
For server consumers,
data_received()
is the only method to implement. For client consumers,start_request()
should also be implemented.A
ProtocolConsumer
is a subclass ofEventHandler
and it has two default one time events:pre_request
fired when the request is received (for servers) or just before is sent (for clients). This occurs just before thestart_request()
method.post_request
fired when the request is done. Theon_finished
attribute is a shortcut for thepost_request
OneTime
event and therefore can be used to wait for the request to have received a full response (clients).
In addition, it has two many times events:
data_received
fired when new data is received from the transport but not yet processed (before thedata_received()
method is invoked)data_processed
fired just after data has been consumed (after thedata_received()
method)
Note
A useful example on how to use the
data_received
event is the wsgi proxy server.-
connection
¶ The
Connection
of this consumer.
-
connection_lost
(exc)[source]¶ Called by the
connection
when the transport is closed.By default it calls the
finished()
method. It can be overwritten to handle the potential exceptionexc
.
-
connection_made
(connection)[source]¶ Called by a
Connection
when it starts using this consumer.By default it does nothing.
-
data_received
(data)[source]¶ Called when some data is received.
This method must be implemented by subclasses for both server and client consumers.
The argument is a bytes object.
-
on_finished
¶ Event fired once a full response to a request is received. It is the
post_request
one time event.
-
start
(request=None)[source]¶ Starts processing the request for this protocol consumer.
There is no need to override this method, implement
start_request()
instead. If eitherconnection
ortransport
are missing, aRuntimeError
occurs.For server side consumer, this method simply fires the
pre_request
event.
-
start_request
()[source]¶ Starts a new request.
Invoked by the
start()
method to kick start the request with remote server. For serverProtocolConsumer
this method is not invoked at all.For clients this method should be implemented and it is critical method where errors caused by stale socket connections can arise. This method should not be called directly. Use
start()
instead. Typically one writes some data from therequest
into the transport. Something like this:self.transport.write(self.request.encode())
-
transport
¶ The
Transport
of this consumer
-
write
(data)[source]¶ Delegate writing to the underlying
Connection
Return an empty tuple or a
Future
Producers¶
Producers are factory of Protocol
with end-points.
They are used by both servers and clients classes.
Producer¶
-
class
pulsar.async.protocols.
Producer
(loop=None, protocol_factory=None, name=None, max_requests=None, logger=None)[source]¶ An Abstract
EventHandler
class for all producers of socket (client and servers)-
build_consumer
(consumer_factory)[source]¶ Build a consumer for a protocol.
This method can be used by protocols which handle several requests, for example the
Connection
class.Parameters: consumer_factory – consumer factory to use.
-
create_protocol
(**kw)[source]¶ Create a new protocol via the
protocol_factory()
This method increase the count of
sessions
and build the protocol passingself
as the producer.
-
requests_processed
¶ Total number of requests processed.
-
TCP Server¶
-
class
pulsar.async.protocols.
TcpServer
(protocol_factory, loop, address=None, name=None, sockets=None, max_requests=None, keep_alive=None, logger=None)[source]¶ A
Producer
of serverConnection
for TCP servers.-
_server
¶ A
Server
managed by this Tcp wrapper.Available once the
start_serving()
method has returned.
-
address
¶ Socket address of this server.
It is obtained from the first socket
getsockname
method.
-
create_protocol
()[source]¶ Override
Producer.create_protocol()
.
-
UDP¶
Classes for the (user) datagram protocol. UDP uses a simple transmission model with a minimum of protocol mechanism.
Datagram Protocol¶
Datagram Server¶
-
class
pulsar.async.protocols.
DatagramServer
(protocol_factory, loop=None, address=None, name=None, sockets=None, max_requests=None, logger=None)[source]¶ An
Producer
for serving UDP sockets.-
_transports
¶ A list of
DatagramTransport
.Available once the
create_endpoint()
method has returned.
-
Protocol Mixins¶
FlowControl¶
-
class
pulsar.async.mixins.
FlowControl
(low_limit=None, high_limit=None, **kw)[source]¶ A protocol mixin for flow control logic.
This implements the protocol methods
pause_writing()
,resume_writing()
.-
pause_writing
()[source]¶ Called by the transport when the buffer goes over the high-water mark
Successive calls to this method will fails unless
resume_writing()
is called first.
-
resume_writing
(exc=None)[source]¶ Resume writing.
Successive calls to this method will fails unless
pause_writing()
is called first.
-
Clients¶
This section introduces classes implementing the transport/protocol paradigm
for clients with several connections to a remote TcpServer
.
Abstract Client¶
Abstract UDP Client¶
Pool¶
-
class
pulsar.async.clients.
Pool
(creator, pool_size=10, loop=None, timeout=None, **kw)[source]¶ An asynchronous pool of open connections.
Open connections are either
in_use
oravailable
to be used. Available connection are placed in anasyncio.Queue
.This class is not thread safe.
-
available
¶ Number of available connections in the pool.
-
closed
¶ True when this pool is closed
-
connect
()[source]¶ Get a connection from the pool.
The connection is either a new one or retrieved from the
available
connections in the pool.Returns: a Future
resulting in the connection.
-
in_use
¶ The number of connections in use.
These connections are not available until they are released back to the pool.
-
pool_size
¶ The maximum number of open connections allowed.
If more connections are requested, the request is queued and a connection returned as soon as one becomes available.
-
Pool Connection¶
-
class
pulsar.async.clients.
PoolConnection
(pool, connection)[source]¶ A wrapper for a
Connection
in a connectionPool
.-
pool
¶ The
Pool
which created thisPoolConnection
-
connection
¶ The underlying socket connection.
-
close
(discard=False)[source]¶ Close this pool connection by releasing the underlying
connection
back to thepool
.
-
detach
(discard=True)[source]¶ Remove the underlying
connection
from the connectionpool
.
-