Publish/Subscribe

A data Store can implement the pubsub() method to return a valid PubSub handler.

Channels

Channels are an high level object which uses a PubSub handler to manage events on channels. They are useful for:

  • Reducing the number of channels to subscribe to by introducing channel events
  • Manage registration and un-registration to channel events
  • Handle reconnection with exponential back-off

API

PubSub

class pulsar.apps.data.PubSub(store, protocol=None)[source]

A Publish/Subscriber interface.

A PubSub handler is never initialised directly, instead, the pubsub() method of a data RemoteStore is used.

To listen for messages one adds clients to the handler:

def do_somethind(channel, message):
    ...

pubsub = client.pubsub()
pubsub.add_client(do_somethind)
pubsub.subscribe('mychannel')

You can add as many listening clients as you like. Clients are functions which receive two parameters only, the channel sending the message and the message.

A PubSub handler can be used to publish messages too:

pubsub.publish('mychannel', 'Hello')

An additional protocol object can be supplied. The protocol must implement the encode and decode methods.

protocol

Protocol of this pubsub handler

publish_event(channel, event, message)[source]

Publish a new event message to a channel.

publish(channel, message)[source]

Publish a new message to a channel.

count(*channels)[source]

Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.

channels(pattern=None)[source]

Lists the currently active channels.

An active channel is a Pub/Sub channel with one ore more subscribers (not including clients subscribed to patterns). If no pattern is specified, all the channels are listed, otherwise if pattern is specified only channels matching the specified glob-style pattern are listed.

psubscribe(pattern, *patterns)[source]

Subscribe to a list of patterns.

punsubscribe(*channels)[source]

Unsubscribe from a list of patterns.

subscribe(channel, *channels)[source]

Subscribe to a list of channels.

unsubscribe(*channels)[source]

Un-subscribe from a list of channels.

close()[source]

Stop listening for messages.

add_client(client)[source]

Add a new client to the set of all clients.

Clients must be callable accepting two parameters, the channel and the message. When a new message is received from the publisher, the broadcast() method will notify all clients via the callable method.

remove_client(client)[source]

Remove client from the set of all clients.

broadcast(response)[source]

Broadcast message to all clients.

Channels

class pulsar.apps.data.Channels(store, namespace=None, status_channel=None, logger=None)[source]

Manage channels for publish/subscribe

close()[source]

Close channels and underlying store handler

Returns:a coroutine and therefore it must be awaited
connect(next_time=None)[source]

Connect with store

Returns:a coroutine and therefore it must be awaited
event_pattern(event)[source]

Channel pattern for an event name

publish(channel, event, data=None)[source]

Publish a new event on a channel :param channel: channel name :param event: event name :param data: optional payload to include in the event :return: a coroutine and therefore it must be awaited

register(channel, event, callback)[source]

Register a callback to channel_name and event.

A prefix will be added to the channel name if not already available or the prefix is an empty string

Parameters:
  • channel – channel name
  • event – event name
  • callback – callback to execute when event on channel occurs
Returns:

a coroutine which results in the channel where the callback was registered

statusType

alias of StatusType

unregister(channel, event, callback)[source]

Safely unregister a callback from the list of event callbacks for channel_name.

Parameters:
  • channel – channel name
  • event – event name
  • callback – callback to execute when event on channel occurs
Returns:

a coroutine which results in the channel object where the callback was removed (if found)