Source code for pulsar.apps.data.redis.client

from itertools import chain
import datetime

import pulsar
from pulsar.utils.pep import to_string
from pulsar.utils.structures import mapping_iterator, Zset
from pulsar.apps.ds import COMMANDS_INFO, CommandError

from .pubsub import RedisPubSub
from .lock import Lock

str_or_bytes = (bytes, str)

INVERSE_COMMANDS_INFO = dict(((i.method_name, i.name)
                              for i in COMMANDS_INFO.values()))


class Executor:
    __slots__ = ('client', 'command')

    def __init__(self, client, command):
        self.client = client
        self.command = command

    def __call__(self, *args, **options):
        return self.client.execute(self.command, *args, **options)


class ResponseError:
    __slots__ = ('exception',)

    def __init__(self, exception):
        self.exception = exception


def dict_merge(*dicts):
    merged = {}
    [merged.update(d) for d in dicts]
    return merged


def pairs_to_object(response, factory=None):
    it = iter(response)
    return (factory or dict)(zip(it, it))


def values_to_object(response, fields=None, factory=None):
    if fields is not None:
        return (factory or dict)(zip(fields, response))
    else:
        return response


def string_keys_to_dict(key_string, callback):
    return dict.fromkeys(key_string.split(), callback)


def parse_info(response):
    info = {}
    response = to_string(response)

    def get_value(value):
        if ',' not in value or '=' not in value:
            try:
                if '.' in value:
                    return float(value)
                else:
                    return int(value)
            except ValueError:
                return value
        else:
            sub_dict = {}
            for item in value.split(','):
                k, v = item.rsplit('=', 1)
                sub_dict[k] = get_value(v)
            return sub_dict

    for line in response.splitlines():
        if line and not line.startswith('#'):
            key, value = line.split(':', 1)
            info[key] = get_value(value)
    return info


def values_to_zset(response, withscores=False, **kw):
    if withscores:
        it = iter(response)
        return Zset(((float(score), value) for value, score in zip(it, it)))
    else:
        return response


def sort_return_tuples(response, groups=None, **options):
    """
    If ``groups`` is specified, return the response as a list of
    n-element tuples with n being the value found in options['groups']
    """
    if not response or not groups:
        return response
    return list(zip(*[response[i::groups] for i in range(groups)]))


def pubsub_callback(response, subcommand=None):
    if subcommand == 'numsub':
        it = iter(response)
        return dict(((k, int(v)) for k, v in zip(it, it)))
        return pairs_to_object(response)
    elif subcommand == 'numpat':
        return int(response)
    else:
        return response


class Consumer(pulsar.ProtocolConsumer):

    RESPONSE_CALLBACKS = dict_merge(
        string_keys_to_dict(
            'BGSAVE FLUSHALL FLUSHDB HMSET LSET LTRIM MSET RENAME RESTORE '
            'SAVE SELECT SHUTDOWN SLAVEOF SET WATCH UNWATCH',
            lambda r: r == b'OK'
        ),
        string_keys_to_dict('SORT', sort_return_tuples),
        string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
        string_keys_to_dict('SMEMBERS SDIFF SINTER SUNION', set),
        string_keys_to_dict('INCRBYFLOAT HINCRBYFLOAT ZINCRBY ZSCORE',
                            lambda v: float(v) if v is not None else v),
        string_keys_to_dict('ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
                            values_to_zset),
        string_keys_to_dict('EXISTS EXPIRE EXPIREAT PEXPIRE PEXPIREAT '
                            'PERSIST RENAMENX',
                            lambda r: bool(r)),
        {
            'PING': lambda r: r == b'PONG',
            'PUBSUB': pubsub_callback,
            'INFO': parse_info,
            'TIME': lambda x: (int(float(x[0])), int(float(x[1]))),
            'HGETALL': pairs_to_object,
            'HMGET': values_to_object,
            'TYPE': lambda r: r.decode('utf-8')
        }
    )

    def start_request(self):
        conn = self._connection
        args = self._request[0]
        if len(self._request) == 2:
            chunk = conn.parser.pack_command(args)
        else:
            chunk = conn.parser.pack_pipeline(args)
        conn._transport.write(chunk)

    def parse_response(self, response, command, options):
        callback = self.RESPONSE_CALLBACKS.get(command.upper())
        return callback(response, **options) if callback else response

    def data_received(self, data):
        conn = self._connection
        parser = conn.parser
        parser.feed(data)
        response = parser.get()
        request = self._request
        try:
            if len(request) == 2:
                if response is not False:
                    if not isinstance(response, Exception):
                        cmnd = request[0][0]
                        response = self.parse_response(response, cmnd,
                                                       request[1])
                    else:
                        response = ResponseError(response)
                    self.finished(response)
            else:   # pipeline
                commands, raise_on_error, responses = request
                while response is not False:
                    responses.append(response)
                    response = parser.get()
                if len(responses) == len(commands):
                    error = None
                    result = responses[-1]
                    response = []
                    if isinstance(result, Exception):
                        error = result
                        result = responses[1:-1]
                    for cmds, resp in zip(commands[1:-1], result):
                        args, options = cmds
                        if isinstance(resp, Exception) and not error:
                            error = resp
                        resp = self.parse_response(resp, args[0], options)
                        response.append(resp)
                    if error and raise_on_error:
                        response = ResponseError(error)
                    self.finished(response)
        except Exception as exc:
            self.finished(exc=exc)


[docs]class RedisClient: '''Client for :class:`.RedisStore`. .. attribute:: store The :class:`.RedisStore` for this client. ''' def __init__(self, store): self.store = store def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.store) __str__ = __repr__ @property def _loop(self): return self.store._loop def pubsub(self, **kw): return RedisPubSub(self.store, **kw)
[docs] def pipeline(self): '''Create a :class:`.Pipeline` for pipelining commands ''' return Pipeline(self.store)
def execute(self, command, *args, **options): return self.store.execute(command, *args, **options) execute_command = execute immediate_execute = execute # special commands # STRINGS def decrby(self, key, ammount=None): if ammount is None: return self.execute('decr', key) else: return self.execute('decrby', key, ammount) decr = decrby def incrby(self, key, ammount=None): if ammount is None: return self.execute('incr', key) else: return self.execute('incrby', key, ammount) incr = incrby def incrbyfloat(self, key, ammount=None): if ammount is None: ammount = 1 return self.execute('incrbyfloat', key, ammount)
[docs] def set(self, name, value, ex=None, px=None, nx=False, xx=False): """Set the value at key ``name`` to ``value`` :param ex: sets an expire flag on key ``name`` for ``ex`` seconds. :param px: sets an expire flag on key ``name`` for ``px`` milliseconds. :param nx: if set to True, set the value at key ``name`` to ``value`` if it does not already exist. :param xx: if set to True, set the value at key ``name`` to ``value`` if it already exists. """ pieces = [name, value] if ex: pieces.append('EX') if isinstance(ex, datetime.timedelta): ex = ex.seconds + ex.days * 24 * 3600 pieces.append(ex) if px: pieces.append('PX') if isinstance(px, datetime.timedelta): ms = int(px.microseconds / 1000) px = (px.seconds + px.days * 24 * 3600) * 1000 + ms pieces.append(px) if nx: pieces.append('NX') if xx: pieces.append('XX') return self.execute('set', *pieces)
# HASHES def hmget(self, key, *fields): return self.execute('hmget', key, *fields, fields=fields) def hmset(self, key, iterable): args = [] [args.extend(pair) for pair in mapping_iterator(iterable)] return self.execute('hmset', key, *args) # LISTS def blpop(self, keys, timeout=0): if timeout is None: timeout = 0 if isinstance(keys, str_or_bytes): keys = [keys] else: keys = list(keys) keys.append(timeout) return self.execute_command('BLPOP', *keys) def brpop(self, keys, timeout=0): if timeout is None: timeout = 0 if isinstance(keys, str_or_bytes): keys = [keys] else: keys = list(keys) keys.append(timeout) return self.execute_command('BRPOP', *keys) def brpoplpush(self, src, dst, timeout=0): if timeout is None: timeout = 0 return self.execute_command('BRPOPLPUSH', src, dst, timeout) # SORTED SETS
[docs] def zadd(self, name, *args, **kwargs): """ Set any number of score, element-name pairs to the key ``name``. Pairs can be specified in two ways: As ``*args``, in the form of:: score1, name1, score2, name2, ... or as ``**kwargs``, in the form of:: name1=score1, name2=score2, ... The following example would add four values to the 'my-key' key:: client.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4) """ pieces = [] if args: if len(args) % 2 != 0: raise ValueError("ZADD requires an equal number of " "values and scores") pieces.extend(args) for pair in kwargs.items(): pieces.append(pair[1]) pieces.append(pair[0]) return self.execute_command('ZADD', name, *pieces)
def zinterstore(self, des, keys, weights=None, aggregate=None): numkeys = len(keys) pieces = list(keys) if weights: pieces.append(b'WEIGHTS') pieces.extend(weights) if aggregate: pieces.append(b'AGGREGATE') pieces.append(aggregate) return self.execute_command('ZINTERSTORE', des, numkeys, *pieces) def zunionstore(self, des, keys, weights=None, aggregate=None): numkeys = len(keys) pieces = list(keys) if weights: pieces.append(b'WEIGHTS') pieces.extend(weights) if aggregate: pieces.append(b'AGGREGATE') pieces.append(aggregate) return self.execute_command('ZUNIONSTORE', des, numkeys, *pieces) def zrange(self, key, start, stop, withscores=False): if withscores: return self.execute_command('ZRANGE', key, start, stop, b'WITHSCORES', withscores=True) else: return self.execute_command('ZRANGE', key, start, stop) def zrangebyscore(self, key, min, max, withscores=False, offset=None, count=None): pieces = [] if withscores: pieces.append(b'WITHSCORES') if offset: pieces.append(b'LIMIT') pieces.append(offset) pieces.append(count) return self.execute_command('ZRANGEBYSCORE', key, min, max, *pieces, withscores=withscores) def zrevrange(self, key, start, stop, withscores=False): if withscores: return self.execute_command('ZREVRANGE', key, start, stop, 'WITHSCORES', withscores=True) else: return self.execute_command('ZRANGE', key, start, stop) def zrevrangebyscore(self, key, min, max, withscores=False, offset=None, count=None): pieces = [] if withscores: pieces.append(b'WITHSCORES') if offset: pieces.append(b'LIMIT') pieces.append(offset) pieces.append(count) return self.execute_command('ZREVRANGEBYSCORE', key, min, max, *pieces, withscores=withscores) def eval(self, script, keys=None, args=None): return self._eval('eval', script, keys, args) def evalsha(self, sha, keys=None, args=None): return self._eval('evalsha', sha, keys, args)
[docs] def sort(self, key, start=None, num=None, by=None, get=None, desc=False, alpha=False, store=None, groups=False): '''Sort and return the list, set or sorted set at ``key``. ``start`` and ``num`` allow for paging through the sorted data ``by`` allows using an external key to weight and sort the items. Use an "*" to indicate where in the key the item value is located ``get`` allows for returning items from external keys rather than the sorted data itself. Use an "*" to indicate where int he key the item value is located ``desc`` allows for reversing the sort ``alpha`` allows for sorting lexicographically rather than numerically ``store`` allows for storing the result of the sort into the key ``store`` ``groups`` if set to True and if ``get`` contains at least two elements, sort will return a list of tuples, each containing the values fetched from the arguments to ``get``. ''' if ((start is not None and num is None) or (num is not None and start is None)): raise CommandError("``start`` and ``num`` must both be specified") pieces = [key] if by is not None: pieces.append('BY') pieces.append(by) if start is not None and num is not None: pieces.append('LIMIT') pieces.append(start) pieces.append(num) if get is not None: # If get is a string assume we want to get a single value. # Otherwise assume it's an interable and we want to get multiple # values. We can't just iterate blindly because strings are # iterable. if isinstance(get, str): pieces.append('GET') pieces.append(get) else: for g in get: pieces.append('GET') pieces.append(g) if desc: pieces.append('DESC') if alpha: pieces.append('ALPHA') if store is not None: pieces.append('STORE') pieces.append(store) if groups: if not get or isinstance(get, str) or len(get) < 2: raise CommandError('when using "groups" the "get" argument ' 'must be specified and contain at least ' 'two keys') options = {'groups': len(get) if groups else None} return self.execute_command('SORT', *pieces, **options)
def lock(self, name, **kw): return Lock(self, name, **kw) def __getattr__(self, name): command = INVERSE_COMMANDS_INFO.get(name) if command: return Executor(self, command) else: raise AttributeError("'%s' object has no attribute '%s'" % (type(self), name)) def _eval(self, command, script, keys, args): all_args = keys if keys is not None else () num_keys = len(all_args) if args: all_args = tuple(chain(all_args, args)) return self.execute(command, script, num_keys, *all_args)
[docs]class Pipeline(RedisClient): '''A :class:`.RedisClient` for pipelining commands ''' def __init__(self, store): self.store = store self.reset() def execute(self, *args, **kwargs): self.command_stack.append((args, kwargs)) execute_command = execute def reset(self): self.command_stack = []
[docs] def commit(self, raise_on_error=True): '''Send commands to redis. ''' cmds = list(chain([(('multi',), {})], self.command_stack, [(('exec',), {})])) self.reset() return self.store.execute_pipeline(cmds, raise_on_error)
def immediate_execute(self, command, *args, **options): return self.store.execute(command, *args, **options)