Source code for pulsar.apps.data.store

'''
The main component for pulsar datastore clients is the :class:`.Store`
class which encapsulates the essential API for communicating and executing
asynchronous commands on remote servers.
'''
from abc import ABCMeta, abstractmethod
from urllib.parse import urlsplit, parse_qsl, urlunparse, urlencode

from pulsar import ImproperlyConfigured, Producer, EventHandler, ProtocolError
from pulsar.utils.importer import module_attribute
from pulsar.utils.pep import to_string


data_stores = {}


def noop():     # pragma    nocover
    if False:
        yield None


class NoSuchStore(ImproperlyConfigured):
    pass


class Command:
    '''A command executed during a in a :meth:`~.Store.execute_transaction`

    .. attribute:: action

        Type of action:
        * 0 custom command
        * 1 equivalent to an SQL INSERT
        * 2 equivalent to an SQL DELETE
    '''
    __slots__ = ('args', 'action')
    INSERT = 1
    UPDATE = 2
    DELETE = 3

    def __init__(self, args, action=0):
        self.args = args
        self.action = action

    @classmethod
    def insert(cls, args):
        return cls(args, cls.INSERT)


[docs]class Store(metaclass=ABCMeta): '''Base class for an asynchronous :ref:`data stores <data-stores>`. A :class:`.Store` should not be created directly, the high level :func:`.create_store` function should be used instead. .. attribute:: _host The remote host, tuple or string .. attribute:: _user The user name .. attribute:: _password The user password ''' _scheme = None registered = False default_manager = None def __init__(self, name, host, database=None, user=None, password=None, encoding=None, **kw): self._name = name self._host = host self._encoding = encoding or 'utf-8' self._database = database self._user = user self._password = password self._urlparams = {} self._init(**kw) self._dns = self.buildurl() @property def name(self): '''Store name''' return self._name @property def database(self): '''Database name/number associated with this store.''' return self._database @database.setter def database(self, value): self._database = value self._dns = self.buildurl() @property def encoding(self): '''Store encoding (usually ``utf-8``) ''' return self._encoding @property def dns(self): '''Domain name server''' return self._dns @property def urlparams(self): """url parameters in dns query""" return self._urlparams @classmethod def register(cls): pass def __str__(self): return self._dns def __repr__(self): return 'Store(dns="%s")' % self
[docs] def database_create(self, dbname=None, **kw): '''Create a new database in this store. By default it does nothing, stores must implement this method only if they support database creation. :param dbname: optional database name. If not supplied a database with :attr:`database` is created. ''' return noop()
def database_all(self, dbname=None): return noop()
[docs] def table_all(self, **kw): '''Information about the table/collection mapping ``model`` ''' return noop()
def table_index_create(self, table_name, index, **kw): return noop() def table_index_drop(self, table_name, index, **kw): return noop() def table_index_all(self, table_name, **kw): return noop() # INTERNALS ####################### def _init(self, **kw): # pragma nocover '''Internal initialisation''' pass def buildurl(self, **kw): pre = '' if self._user: if self._password: pre = '%s:%s@' % (self._user, self._password) else: pre = '%s@' % self._user elif self._password: raise ImproperlyConfigured('password but not user') assert self._password host = self._host if isinstance(host, tuple): host = '%s:%s' % host host = '%s%s' % (pre, host) path = '/%s' % self._database if self._database else '' self._urlparams.update(kw) query = urlencode(self._urlparams) scheme = self._name if self._scheme: scheme = '%s+%s' % (self._scheme, scheme) if not host: path = '//%s' % path return urlunparse((scheme, host, path, '', query, ''))
[docs]class RemoteStore(Producer, Store): '''Base class for remote :ref:`data stores <data-stores>`. It is an :class:`.Producer` for accessing and retrieving data from remote data servers such as redis. ''' MANY_TIMES_EVENTS = ('request',) def __init__(self, name, host, loop=None, protocol_factory=None, **kw): super().__init__(loop, protocol_factory=protocol_factory) Store.__init__(self, name, host, **kw) @abstractmethod
[docs] def connect(self): '''Connect with store server '''
[docs] def execute(self, *args, **options): '''Execute a command ''' raise NotImplementedError
[docs] def ping(self): '''Used to check if the data server is available ''' raise NotImplementedError
[docs] def client(self): '''Get a client for this store if implemented ''' raise NotImplementedError
[docs] def pubsub(self, **kw): '''Obtain a :class:`.PubSub` handler for this store if implemented ''' raise NotImplementedError
[docs] def channels(self, **kw): '''Obtain a :class:`.Channels` handler for this store if implemented ''' raise NotImplementedError
[docs] def close(self): '''Close all open connections ''' raise NotImplementedError
[docs] def flush(self): '''Flush the store.''' raise NotImplementedError
# encode/decode field values
[docs] def encode_bytes(self, data): '''Encode bytes ``data`` :param data: a bytes string :return: bytes or string ''' return data
[docs] def dencode_bytes(self, data): '''Decode bytes ``data`` :param data: bytes or string :return: bytes ''' return data
def encode_bool(self, data): return bool(data) def encode_json(self, data): return data
class PubSubClient: '''Interface for a client of :class:`.PubSub` handler. Instances of this :class:`Client` are callable object and are called once a new message has arrived from a subscribed channel. The callable accepts two parameters: * ``channel`` the channel which originated the message * ``message`` the message ''' def __call__(self, channel, message): raise NotImplementedError
[docs]class PubSub(EventHandler): '''A Publish/Subscriber interface. A :class:`.PubSub` handler is never initialised directly, instead, the :meth:`~.RemoteStore.pubsub` method of a data :class:`.RemoteStore` is used. To listen for messages one adds clients to the handler:: def do_somethind(channel, message): ... pubsub = client.pubsub() pubsub.add_client(do_somethind) pubsub.subscribe('mychannel') You can add as many listening clients as you like. Clients are functions which receive two parameters only, the ``channel`` sending the message and the ``message``. A :class:`PubSub` handler can be used to publish messages too:: pubsub.publish('mychannel', 'Hello') An additional ``protocol`` object can be supplied. The protocol must implement the ``encode`` and ``decode`` methods. ''' MANY_TIMES_EVENTS = ('connection_lost',) def __init__(self, store, protocol=None): super().__init__(loop=store._loop) self.store = store self._protocol = protocol self._connection = None self._clients = set() def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.store) __str__ = __repr__ @property def protocol(self): """Protocol of this pubsub handler """ return self._protocol
[docs] def publish_event(self, channel, event, message): '''Publish a new event ``message`` to a ``channel``. ''' assert self._protocol is not None, "Protocol required" msg = {'event': event, 'channel': channel} if message: msg['data'] = message return self.publish(channel, msg)
[docs] def publish(self, channel, message): '''Publish a new ``message`` to a ``channel``. ''' raise NotImplementedError
[docs] def count(self, *channels): '''Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels. ''' raise NotImplementedError
[docs] def channels(self, pattern=None): '''Lists the currently active channels. An active channel is a Pub/Sub channel with one ore more subscribers (not including clients subscribed to patterns). If no ``pattern`` is specified, all the channels are listed, otherwise if ``pattern`` is specified only channels matching the specified glob-style pattern are listed. ''' raise NotImplementedError
[docs] def psubscribe(self, pattern, *patterns): '''Subscribe to a list of ``patterns``. ''' raise NotImplementedError
[docs] def punsubscribe(self, *channels): '''Unsubscribe from a list of ``patterns``. ''' raise NotImplementedError
[docs] def subscribe(self, channel, *channels): '''Subscribe to a list of ``channels``. ''' raise NotImplementedError
[docs] def unsubscribe(self, *channels): '''Un-subscribe from a list of ``channels``. ''' raise NotImplementedError
[docs] def close(self): '''Stop listening for messages. ''' raise NotImplementedError
[docs] def add_client(self, client): '''Add a new ``client`` to the set of all :attr:`clients`. Clients must be callable accepting two parameters, the channel and the message. When a new message is received from the publisher, the :meth:`broadcast` method will notify all :attr:`clients` via the ``callable`` method.''' self._clients.add(client)
def __contains__(self, client): return client in self._clients
[docs] def remove_client(self, client): '''Remove *client* from the set of all :attr:`clients`.''' self._clients.discard(client)
# INTERNALS
[docs] def broadcast(self, response): '''Broadcast ``message`` to all :attr:`clients`.''' remove = set() channel = to_string(response[0]) message = response[1] if self._protocol: try: message = self._protocol.decode(message) except ProtocolError: self.logger.exception('Could not decode message') return for client in self._clients: try: client(channel, message) except IOError: remove.add(client) except Exception: self._loop.logger.exception( 'Exception while processing pub/sub client. Removing it.') remove.add(client) self._clients.difference_update(remove)
def parse_store_url(url): assert url, 'No url given' scheme, host, path, query, fr = urlsplit(url) assert not fr, 'store url must not have fragment, found %s' % fr assert scheme, 'Scheme not provided' # pulsar:// if scheme == 'pulsar' and not host: host = '127.0.0.1:0' bits = host.split('@') assert len(bits) <= 2, 'Too many @ in %s' % url params = dict(parse_qsl(query)) if path: database = path[1:] assert '/' not in database, 'Unsupported database %s' % database params['database'] = database if len(bits) == 2: userpass, host = bits userpass = userpass.split(':') assert len(userpass) <= 2,\ 'User and password not in user:password format' params['user'] = userpass[0] if len(userpass) == 2: params['password'] = userpass[1] if ':' in host: host = tuple(host.split(':')) host = host[0], int(host[1]) return scheme, host, params
[docs]def create_store(url, **kw): '''Create a new :class:`Store` for a valid ``url``. :param url: a valid ``url`` takes the following forms: :ref:`Pulsar datastore <store_pulsar>`:: pulsar://user:password@127.0.0.1:6410 :ref:`Redis <store_redis>`:: redis://user:password@127.0.0.1:6500/11?namespace=testdb :param kw: additional key-valued parameters to pass to the :class:`.Store` initialisation method. It can contains parameters such as ``database``, ``user`` and ``password`` to override the ``url`` values. Additional parameters are processed by the :meth:`.Store._init` method. :return: a :class:`Store`. ''' if isinstance(url, Store): return url scheme, address, params = parse_store_url(url) dotted_path = data_stores.get(scheme) if not dotted_path: raise NoSuchStore('%s store not available' % scheme) store_class = module_attribute(dotted_path, safe=True) if not store_class: raise ImproperlyConfigured('"%s" store not available' % dotted_path) if not store_class.registered: store_class.registered = True store_class.register() params.update(kw) return store_class(scheme, address, **params)
[docs]def register_store(name, dotted_path): '''Register a new :class:`.Store` with schema ``name`` which can be found at the python ``dotted_path``. ''' data_stores[name] = dotted_path