Source code for pulsar.apps.data.pulsards.startds
import asyncio
from pulsar import when_monitor_start, get_application, send
from pulsar.apps.data import create_store
from pulsar.apps.ds import PulsarDS
async def start_pulsar_ds(arbiter, host, workers=0):
lock = getattr(arbiter, 'lock', None)
if lock is None:
arbiter.lock = lock = asyncio.Lock()
await lock.acquire()
try:
app = await get_application('pulsards')
if not app:
app = PulsarDS(bind=host, workers=workers, load_config=False)
cfg = await app(arbiter)
else:
cfg = app.cfg
return cfg
finally:
lock.release()
[docs]async def start_store(app, url, workers=0, **kw):
'''Equivalent to :func:`.create_store` for most cases excepts when the
``url`` is for a pulsar store not yet started.
In this case, a :class:`.PulsarDS` is started.
'''
store = create_store(url, **kw)
if store.name == 'pulsar':
client = store.client()
try:
await client.ping()
except ConnectionRefusedError:
host = localhost(store._host)
if not host:
raise
cfg = await send('arbiter', 'run', start_pulsar_ds,
host, workers)
store._host = cfg.addresses[0]
dns = store.buildurl()
store = create_store(dns, **kw)
app.cfg.set('data_store', store.dns)
def localhost(host):
if isinstance(host, tuple):
if host[0] in ('127.0.0.1', ''):
return ':'.join((str(b) for b in host))
else:
return host
def _start_store(monitor):
app = monitor.app
if not isinstance(app, PulsarDS) and app.cfg.data_store:
return start_store(app, app.cfg.data_store)
when_monitor_start.append(_start_store)