Source code for pulsar.async.lock
import asyncio
from abc import ABC, abstractmethod
from ..utils.exceptions import LockError
[docs]class LockBase(ABC):
"""A asynchronous locking primitive associated to a given name.
An asynchronous lock is in one of two states, 'locked' or 'unlocked'.
It is created in the unlocked state. It has two basic methods,
:meth:`.acquire` and :meth:`.release. When the state is unlocked,
:meth:`.acquire` changes the state to locked and returns immediately.
When the state is locked, :meth:`.acquire` wait
until a call to :meth:`.release` changes it to unlocked,
then the :meth:`.acquire` call resets it to locked and returns.
.. attribute:: blocking
The time to wait for the lock to be free when acquiring it.
When False it does not block, when True it blocks forever,
when a positive number blocks for ``blocking`` seconds.
.. attribute:: timeout
Free the lock after timeout seconds. If timeout is None (default)
does not free the lock until ``release`` is called.
"""
def __init__(self, name, *, loop=None, timeout=None, blocking=True):
self.name = name
self.timeout = timeout
self.blocking = blocking
self._loop = loop or asyncio.get_event_loop()
@abstractmethod
[docs] def locked(self): # pragma nocover
"""Return True if the lock is acquired
"""
raise
@abstractmethod
[docs] async def acquire(self): # pragma nocover
"""Try to acquire the lock
"""
raise
@abstractmethod
[docs] async def release(self): # pragma nocover
"""Release the lock
"""
raise
async def __aenter__(self):
acquired = await self.acquire()
if not acquired:
raise LockError('Could not acquire lock')
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.locked():
await self.release()
[docs]class Lock(LockBase):
"""An asynchronous lock
"""
def __init__(self, name, *, loop=None, timeout=None, blocking=True):
super().__init__(name, loop=loop, timeout=timeout, blocking=blocking)
self._lock = _get_lock(self._loop, name)
self._timeout_handler = None
self._locked = False
def locked(self):
return self._locked
async def acquire(self):
try:
timeout = self.blocking
if timeout is True:
timeout = None
elif timeout is False:
timeout = 0
await asyncio.wait_for(self._lock.acquire(), timeout=timeout)
self._schedule_timeout()
self._locked = True
except asyncio.TimeoutError:
self._locked = False
return self._locked
async def release(self):
self._cancel_lock()
def _schedule_timeout(self):
if self.timeout is not None:
self._timeout_handler = self._loop.call_later(
self.timeout, self._cancel_lock
)
def _cancel_lock(self):
try:
self._lock.release()
except Exception:
pass
self._locked = False
if self._timeout_handler:
self._timeout_handler.cancel()
self._timeout_handler = None
def _get_lock(loop, name):
if not hasattr(loop, '_locks'):
loop._locks = {}
if name not in loop._locks:
loop._locks[name] = asyncio.Lock(loop=loop)
return loop._locks[name]