Protocols/Transports API¶
This part of the pulsar API is about classes responsible for
implementing the Protocol/Transport paradigm. They are based on
asyncio.Protocol and asyncio.DatagramProtocol classes.
Protocols¶
PulsarProtocol¶
-
class
pulsar.async.protocols.PulsarProtocol(loop, session=1, producer=None, logger=None, **kw)[source]¶ A mixin class for both
ProtocolandDatagramProtocol.A
PulsarProtocolis anEventHandlerwhich has two one time events:connection_madeconnection_lost
-
close()[source]¶ Close by closing the
transportReturn the
connection_lostevent which can be used to wait for complete transport closure.
-
connection_made(transport)[source]¶ Sets the
transport, fire theconnection_madeevent and adds atimeoutfor idle connections.
-
session¶ Connection session number.
Passed during initialisation by the
producer. Usually an integer representing the number of separate connections the producer has processed at the time it created thisProtocol.
-
transport¶ The transport for this protocol.
Available once the
connection_made()is called.
Protocol¶
Connection¶
-
class
pulsar.async.protocols.Connection(consumer_factory=None, timeout=None, **kw)[source]¶ A
FlowControlto handle multiple TCP requests/responses.It is a class which acts as bridge between a transport and a
ProtocolConsumer. It routes data arriving from the transport to thecurrent_consumer().-
_consumer_factory¶ A factory of
ProtocolConsumer.
-
_processed¶ number of separate requests processed.
-
current_consumer()[source]¶ The
ProtocolConsumercurrently handling incoming data.This instance will receive data when this connection get data from the
transportvia thedata_received()method.If no consumer is available, build a new one and return it.
-
data_received(data)[source]¶ Delegates handling of data to the
current_consumer().Once done set a timeout for idle connections when a
timeoutis a positive number (of seconds).
-
upgrade(consumer_factory)[source]¶ Upgrade the
_consumer_factory()callable.This method can be used when the protocol specification changes during a response (an example is a WebSocket request/response, or HTTP tunneling).
This method adds a
post_requestcallback to thecurrent_consumer()to build a new consumer with the new_consumer_factory().Parameters: consumer_factory – the new consumer factory (a callable accepting no parameters) Returns: None.
-
Protocol Consumer¶
-
class
pulsar.async.protocols.ProtocolConsumer(loop=None, one_time_events=None, many_times_events=None)[source]¶ The consumer of data for a server or client
Connection.It is responsible for receiving incoming data from an end point via the
Connection.data_received()method, decoding (parsing) and, possibly, writing back to the client or server via thetransportattribute.Note
For server consumers,
data_received()is the only method to implement. For client consumers,start_request()should also be implemented.A
ProtocolConsumeris a subclass ofEventHandlerand it has two default one time events:pre_requestfired when the request is received (for servers) or just before is sent (for clients). This occurs just before thestart_request()method.post_requestfired when the request is done. Theon_finishedattribute is a shortcut for thepost_requestOneTimeevent and therefore can be used to wait for the request to have received a full response (clients).
In addition, it has two many times events:
data_receivedfired when new data is received from the transport but not yet processed (before thedata_received()method is invoked)data_processedfired just after data has been consumed (after thedata_received()method)
Note
A useful example on how to use the
data_receivedevent is the wsgi proxy server.-
connection¶ The
Connectionof this consumer.
-
connection_lost(exc)[source]¶ Called by the
connectionwhen the transport is closed.By default it calls the
finished()method. It can be overwritten to handle the potential exceptionexc.
-
connection_made(connection)[source]¶ Called by a
Connectionwhen it starts using this consumer.By default it does nothing.
-
data_received(data)[source]¶ Called when some data is received.
This method must be implemented by subclasses for both server and client consumers.
The argument is a bytes object.
-
on_finished¶ Event fired once a full response to a request is received. It is the
post_requestone time event.
-
start(request=None)[source]¶ Starts processing the request for this protocol consumer.
There is no need to override this method, implement
start_request()instead. If eitherconnectionortransportare missing, aRuntimeErroroccurs.For server side consumer, this method simply fires the
pre_requestevent.
-
start_request()[source]¶ Starts a new request.
Invoked by the
start()method to kick start the request with remote server. For serverProtocolConsumerthis method is not invoked at all.For clients this method should be implemented and it is critical method where errors caused by stale socket connections can arise. This method should not be called directly. Use
start()instead. Typically one writes some data from therequestinto the transport. Something like this:self.transport.write(self.request.encode())
-
transport¶ The
Transportof this consumer
-
write(data)[source]¶ Delegate writing to the underlying
ConnectionReturn an empty tuple or a
Future
Producers¶
Producers are factory of Protocol with end-points.
They are used by both servers and clients classes.
Producer¶
-
class
pulsar.async.protocols.Producer(loop=None, protocol_factory=None, name=None, max_requests=None, logger=None)[source]¶ An Abstract
EventHandlerclass for all producers of socket (client and servers)-
build_consumer(consumer_factory)[source]¶ Build a consumer for a protocol.
This method can be used by protocols which handle several requests, for example the
Connectionclass.Parameters: consumer_factory – consumer factory to use.
-
create_protocol(**kw)[source]¶ Create a new protocol via the
protocol_factory()This method increase the count of
sessionsand build the protocol passingselfas the producer.
-
requests_processed¶ Total number of requests processed.
-
TCP Server¶
-
class
pulsar.async.protocols.TcpServer(protocol_factory, loop, address=None, name=None, sockets=None, max_requests=None, keep_alive=None, logger=None)[source]¶ A
Producerof serverConnectionfor TCP servers.-
_server¶ A
Servermanaged by this Tcp wrapper.Available once the
start_serving()method has returned.
-
address¶ Socket address of this server.
It is obtained from the first socket
getsocknamemethod.
-
create_protocol()[source]¶ Override
Producer.create_protocol().
-
UDP¶
Classes for the (user) datagram protocol. UDP uses a simple transmission model with a minimum of protocol mechanism.
Datagram Protocol¶
Datagram Server¶
-
class
pulsar.async.protocols.DatagramServer(protocol_factory, loop=None, address=None, name=None, sockets=None, max_requests=None, logger=None)[source]¶ An
Producerfor serving UDP sockets.-
_transports¶ A list of
DatagramTransport.Available once the
create_endpoint()method has returned.
-
Protocol Mixins¶
FlowControl¶
-
class
pulsar.async.mixins.FlowControl(low_limit=None, high_limit=None, **kw)[source]¶ A protocol mixin for flow control logic.
This implements the protocol methods
pause_writing(),resume_writing().-
pause_writing()[source]¶ Called by the transport when the buffer goes over the high-water mark
Successive calls to this method will fails unless
resume_writing()is called first.
-
resume_writing(exc=None)[source]¶ Resume writing.
Successive calls to this method will fails unless
pause_writing()is called first.
-
Clients¶
This section introduces classes implementing the transport/protocol paradigm
for clients with several connections to a remote TcpServer.
Abstract Client¶
Abstract UDP Client¶
Pool¶
-
class
pulsar.async.clients.Pool(creator, pool_size=10, loop=None, timeout=None, **kw)[source]¶ An asynchronous pool of open connections.
Open connections are either
in_useoravailableto be used. Available connection are placed in anasyncio.Queue.This class is not thread safe.
-
available¶ Number of available connections in the pool.
-
closed¶ True when this pool is closed
-
connect()[source]¶ Get a connection from the pool.
The connection is either a new one or retrieved from the
availableconnections in the pool.Returns: a Futureresulting in the connection.
-
in_use¶ The number of connections in use.
These connections are not available until they are released back to the pool.
-
pool_size¶ The maximum number of open connections allowed.
If more connections are requested, the request is queued and a connection returned as soon as one becomes available.
-
Pool Connection¶
-
class
pulsar.async.clients.PoolConnection(pool, connection)[source]¶ A wrapper for a
Connectionin a connectionPool.-
pool¶ The
Poolwhich created thisPoolConnection
-
connection¶ The underlying socket connection.
-
close(discard=False)[source]¶ Close this pool connection by releasing the underlying
connectionback to thepool.
-
detach(discard=True)[source]¶ Remove the underlying
connectionfrom the connectionpool.
-