Source code for examples.chat.manage

'''This example is web-based chat application which exposes three different
:ref:`wsgi routers <wsgi-routing>`:

* A :class:`.Router` to render the web page
* A :class:`.WebSocket` with the :class:`Chat` handler
* A :class:`.Router` with the :class:`Rpc` handler
  for exposing a :ref:`JSON-RPC <apps-rpc>` api.

To run the server::

    python manage.py

and open web browsers at http://localhost:8060

To send messages from the JSON RPC open a python shell and::

    >>> from pulsar.apps import rpc
    >>> p = rpc.JsonProxy('http://127.0.0.1:8060/rpc')
    >>> p.message('Hi from rpc')
    'OK'

This example uses the pulsar :ref:`Publish/Subscribe handler <apps-pubsub>`
to synchronise messages in a multiprocessing web server.

Implementation
===========================

.. autoclass:: Chat
   :members:
   :member-order: bysource

.. autoclass:: Rpc
   :members:
   :member-order: bysource

.. autoclass:: WebChat
   :members:
   :member-order: bysource
'''
import os
import time

from pulsar import create_future, ensure_future
from pulsar.apps.wsgi import (Router, WsgiHandler, LazyWsgi, WSGIServer,
                              GZipMiddleware)
from pulsar.apps.ws import WS, WebSocket
from pulsar.apps.rpc import PulsarServerCommands
from pulsar.apps.data import create_store, PubSubClient
from pulsar.utils.httpurl import JSON_CONTENT_TYPES
from pulsar.apps.ds import pulsards_url
from pulsar.utils.system import json
from pulsar.utils.pep import to_string

CHAT_DIR = os.path.dirname(__file__)


class ChatClient(PubSubClient):
    __slots__ = ('connection', 'channel')

    def __init__(self, connection, channel):
        self.connection = connection
        self.channel = channel

    def __call__(self, channel, message):
        self.connection.write(message)


class Protocol:

    def encode(self, message):
        '''Encode a message when publishing.'''
        if not isinstance(message, dict):
            message = {'message': message}
        message['time'] = time.time()
        return json.dumps(message)

    def decode(self, message):
        return to_string(message)


#    Web Socket Chat handler
[docs]class Chat(WS): '''The websocket handler (:class:`.WS`) managing the chat application. .. attribute:: pubsub The :ref:`publish/subscribe handler <apps-pubsub>` created by the wsgi application in the :meth:`WebChat.setup` method. ''' def __init__(self, pubsub, channel): self.pubsub = pubsub self.channel = channel
[docs] def on_open(self, websocket): '''When a new websocket connection is established it creates a new :class:`ChatClient` and adds it to the set of clients of the :attr:`pubsub` handler.''' self.pubsub.add_client(ChatClient(websocket, self.channel))
[docs] def on_message(self, websocket, msg): '''When a new message arrives, it publishes to all listening clients. ''' if msg: lines = [] for l in msg.split('\n'): l = l.strip() if l: lines.append(l) msg = ' '.join(lines) if msg: return self.pubsub.publish(self.channel, msg)
# RPC MIDDLEWARE To publish messages
[docs]class Rpc(PulsarServerCommands): def __init__(self, pubsub, channel, **kwargs): self.pubsub = pubsub self.channel = channel super().__init__(**kwargs)
[docs] async def rpc_message(self, request, message): '''Publish a message via JSON-RPC''' await self.pubsub.publish(self.channel, message) return 'OK'
[docs]class WebChat(LazyWsgi): '''This is the :ref:`wsgi application <wsgi-handlers>` for this web-chat example.''' def __init__(self, server_name): self.name = server_name
[docs] def setup(self, environ): '''Called once only to setup the WSGI application handler. Check :ref:`lazy wsgi handler <wsgi-lazy-handler>` section for further information. ''' cfg = environ['pulsar.cfg'] loop = environ['pulsar.connection']._loop self.store = create_store(cfg.data_store, loop=loop) pubsub = self.store.pubsub(protocol=Protocol()) channel = '%s_webchat' % self.name ensure_future(pubsub.subscribe(channel), loop=loop) return WsgiHandler([Router('/', get=self.home_page), WebSocket('/message', Chat(pubsub, channel)), Router('/rpc', post=Rpc(pubsub, channel), response_content_types=JSON_CONTENT_TYPES)], [AsyncResponseMiddleware, GZipMiddleware(min_length=20)])
def home_page(self, request): data = open(os.path.join(CHAT_DIR, 'chat.html')).read() request.response.content_type = 'text/html' request.response.content = to_string(data % request.environ) return request.response
def AsyncResponseMiddleware(environ, resp): '''This is just for testing the asynchronous response middleware ''' future = create_future() future._loop.call_soon(future.set_result, resp) return future def server(callable=None, name=None, data_store=None, **params): name = name or 'wsgi' data_store = pulsards_url(data_store) return WSGIServer(callable=WebChat(name), name=name, data_store=data_store, **params) if __name__ == '__main__': # pragma nocover server().start()