Publish/Subscribe¶
A data Store can implement the pubsub() method to return
a valid PubSub handler.
Channels¶
Channels are an high level object which uses a PubSub handler to manage
events on channels. They are useful for:
- Reducing the number of channels to subscribe to by introducing channel events
- Manage registration and un-registration to channel events
- Handle reconnection with exponential back-off
API¶
PubSub¶
- 
class pulsar.apps.data.PubSub(store, protocol=None)[source]¶
- A Publish/Subscriber interface. - A - PubSubhandler is never initialised directly, instead, the- pubsub()method of a data- RemoteStoreis used.- To listen for messages one adds clients to the handler: - def do_somethind(channel, message): ... pubsub = client.pubsub() pubsub.add_client(do_somethind) pubsub.subscribe('mychannel') - You can add as many listening clients as you like. Clients are functions which receive two parameters only, the - channelsending the message and the- message.- A - PubSubhandler can be used to publish messages too:- pubsub.publish('mychannel', 'Hello') - An additional - protocolobject can be supplied. The protocol must implement the- encodeand- decodemethods.- 
protocol¶
- Protocol of this pubsub handler 
 - 
count(*channels)[source]¶
- Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels. 
 - 
channels(pattern=None)[source]¶
- Lists the currently active channels. - An active channel is a Pub/Sub channel with one ore more subscribers (not including clients subscribed to patterns). If no - patternis specified, all the channels are listed, otherwise if- patternis specified only channels matching the specified glob-style pattern are listed.
 
- 
Channels¶
- 
class pulsar.apps.data.Channels(store, namespace=None, status_channel=None, logger=None)[source]¶
- Manage channels for publish/subscribe - 
close()[source]¶
- Close channels and underlying store handler - Returns: - a coroutine and therefore it must be awaited 
 - 
connect(next_time=None)[source]¶
- Connect with store - Returns: - a coroutine and therefore it must be awaited 
 - 
publish(channel, event, data=None)[source]¶
- Publish a new - eventon a- channel:param channel: channel name :param event: event name :param data: optional payload to include in the event :return: a coroutine and therefore it must be awaited
 - 
register(channel, event, callback)[source]¶
- Register a callback to - channel_nameand- event.- A prefix will be added to the channel name if not already available or the prefix is an empty string - Parameters: - channel – channel name
- event – event name
- callback – callback to execute when event on channel occurs
 - Returns: - a coroutine which results in the channel where the callback was registered 
 - 
statusType¶
- alias of - StatusType
 - 
unregister(channel, event, callback)[source]¶
- Safely unregister a callback from the list of - eventcallbacks for- channel_name.- Parameters: - channel – channel name
- event – event name
- callback – callback to execute when event on channel occurs
 - Returns: - a coroutine which results in the channel object where the - callbackwas removed (if found)
 
- 
