Publish/Subscribe¶
A data Store
can implement the pubsub()
method to return
a valid PubSub
handler.
Channels¶
Channels are an high level object which uses a PubSub
handler to manage
events on channels. They are useful for:
- Reducing the number of channels to subscribe to by introducing channel events
- Manage registration and un-registration to channel events
- Handle reconnection with exponential back-off
API¶
PubSub¶
-
class
pulsar.apps.data.
PubSub
(store, protocol=None)[source]¶ A Publish/Subscriber interface.
A
PubSub
handler is never initialised directly, instead, thepubsub()
method of a dataRemoteStore
is used.To listen for messages one adds clients to the handler:
def do_somethind(channel, message): ... pubsub = client.pubsub() pubsub.add_client(do_somethind) pubsub.subscribe('mychannel')
You can add as many listening clients as you like. Clients are functions which receive two parameters only, the
channel
sending the message and themessage
.A
PubSub
handler can be used to publish messages too:pubsub.publish('mychannel', 'Hello')
An additional
protocol
object can be supplied. The protocol must implement theencode
anddecode
methods.-
protocol
¶ Protocol of this pubsub handler
-
count
(*channels)[source]¶ Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.
-
channels
(pattern=None)[source]¶ Lists the currently active channels.
An active channel is a Pub/Sub channel with one ore more subscribers (not including clients subscribed to patterns). If no
pattern
is specified, all the channels are listed, otherwise ifpattern
is specified only channels matching the specified glob-style pattern are listed.
-
Channels¶
-
class
pulsar.apps.data.
Channels
(store, namespace=None, status_channel=None, logger=None)[source]¶ Manage channels for publish/subscribe
-
close
()[source]¶ Close channels and underlying store handler
Returns: a coroutine and therefore it must be awaited
-
connect
(next_time=None)[source]¶ Connect with store
Returns: a coroutine and therefore it must be awaited
-
publish
(channel, event, data=None)[source]¶ Publish a new
event
on achannel
:param channel: channel name :param event: event name :param data: optional payload to include in the event :return: a coroutine and therefore it must be awaited
-
register
(channel, event, callback)[source]¶ Register a callback to
channel_name
andevent
.A prefix will be added to the channel name if not already available or the prefix is an empty string
Parameters: - channel – channel name
- event – event name
- callback – callback to execute when event on channel occurs
Returns: a coroutine which results in the channel where the callback was registered
-
statusType
¶ alias of
StatusType
-
unregister
(channel, event, callback)[source]¶ Safely unregister a callback from the list of
event
callbacks forchannel_name
.Parameters: - channel – channel name
- event – event name
- callback – callback to execute when event on channel occurs
Returns: a coroutine which results in the channel object where the
callback
was removed (if found)
-