from .skiplist import Skiplist
[docs]class Zset:
'''Ordered-set equivalent of redis zset.
'''
def __init__(self, data=None):
self._sl = Skiplist()
self._dict = {}
if data:
self.update(data)
def __repr__(self):
return repr(self._sl)
__str__ = __repr__
def __len__(self):
return len(self._dict)
def __iter__(self):
for _, value in self._sl:
yield value
def __getstate__(self):
return self._dict
def __setstate__(self, state):
self._dict = state
self._sl = Skiplist(((score, member) for member, score
in state.items()))
def __eq__(self, other):
if isinstance(other, Zset):
return other._dict == self._dict
return False
[docs] def items(self):
'''Iterable over ordered score, value pairs of this :class:`zset`
'''
return iter(self._sl)
def range(self, start, end, scores=False):
return self._sl.range(start, end, scores)
def range_by_score(self, minval, maxval, include_min=True,
include_max=True, start=0, num=None, scores=False):
return self._sl.range_by_score(minval, maxval, start=start,
num=num, include_min=include_min,
include_max=include_max,
scores=scores)
[docs] def score(self, member, default=None):
'''The score of a given member'''
return self._dict.get(member, default)
def count(self, minval, maxval, include_min=True, include_max=True):
return self._sl.count(minval, maxval, include_min, include_max)
def add(self, score, val):
r = 1
if val in self._dict:
sc = self._dict[val]
if sc == score:
return 0
self.remove(val)
r = 0
self._dict[val] = score
self._sl.insert(score, val)
return r
[docs] def update(self, score_vals):
'''Update the :class:`zset` with an iterable over pairs of
scores and values.'''
add = self.add
for score, value in score_vals:
add(score, value)
def remove_items(self, items):
removed = 0
for item in items:
score = self.remove(item)
if score is not None:
removed += 1
return removed
[docs] def remove(self, item):
'''Remove ``item`` for the :class:`zset` it it exists.
If found it returns the score of the item removed.
'''
score = self._dict.pop(item, None)
if score is not None:
index = self._sl.rank(score)
assert index >= 0, 'could not find start range'
for i, v in enumerate(self._sl.range(index)):
if v == item:
assert self._sl.remove_range(index + i, index+i + 1) == 1
return score
assert False, 'could not find element'
[docs] def remove_range(self, start, end):
'''Remove a range by score.
'''
return self._sl.remove_range(
start, end, callback=lambda sc, value: self._dict.pop(value))
[docs] def remove_range_by_score(self, minval, maxval,
include_min=True, include_max=True):
'''Remove a range by score.
'''
return self._sl.remove_range_by_score(
minval, maxval, include_min=include_min, include_max=include_max,
callback=lambda sc, value: self._dict.pop(value))
[docs] def clear(self):
'''Clear this :class:`zset`.'''
self._sl = Skiplist()
self._dict.clear()
[docs] def rank(self, item):
'''Return the rank (index) of ``item`` in this :class:`zset`.'''
score = self._dict.get(item)
if score is not None:
return self._sl.rank(score)
def flat(self):
return self._sl.flat()
@classmethod
def union(cls, zsets, weights, oper):
result = None
for zset, weight in zip(zsets, weights):
if result is None:
result = cls()
sl = result._sl
for score, value in zset._sl:
result.add(score*weight, value)
else:
for score, value in zset._sl:
score *= weight
existing = sl.score(value)
if existing is not None:
score = oper(score, existing)
result.add(score, value)
return result
@classmethod
def inter(cls, zsets, weights, oper):
result = None
values = None
for zset, _ in zip(zsets, weights):
if values is None:
values = set(zset)
else:
values.intersection_update(zset)
#
for zset, weight in zip(zsets, weights):
if result is None:
result = cls()
for score, value in zset._sl:
if value in values:
result.add(score*weight, value)
else:
for score, value in zset._sl:
if value in values:
existing = result.score(value)
score = oper((score*weight, existing))
result.add(score, value)
return result