Documentation for pulsar's DEVELOPMENT version. Get the release docs here.

Distributed Task Queue

Pulsar ships with an asynchronous TaskQueue for distributing work across threads/processes or machines. It is highly customisable, it can run in multi-threading or multi-processing (default) mode and can share Task across several machines.

By creating Job classes in a similar way you do for celery, this application gives you all you need for running them with very little setup effort:

from pulsar.apps import tasks

if __name__ == '__main__':
    tasks.TaskQueue(tasks_path=['path.to.tasks.*']).start()

Check the task queue tutorial for a running example with simple tasks.

Getting Started

To get started, follow these guidelines:

  • Create a script which runs your application, in the taskqueue tutorial the script is called manage.py.
  • Create the modules where jobs are implemented. It can be a directory containing several submodules as explained in the task paths parameter.
  • Run your script, sit back and relax.

Configuration

A TaskQueue accepts several configuration parameters on top of the standard application settings:

  • The task_paths parameter specifies a list of python paths where to collect Job classes:

    task_paths = ['myjobs','another.moduledir.*']
    

    The * at the end of the second module indicates to collect Job from all submodules of another.moduledir.

  • The schedule_periodic flag indicates if the TaskQueue can schedule PeriodicJob. Usually, only one running TaskQueue application is responsible for scheduling tasks.

    It can be specified in the command line via the --schedule-periodic flag.

    Default: False.

  • The task_backend parameter is a url type string which specifies the task backend to use.

    It can be specified in the command line via the --task-backend ... option.

    Default: local://.

  • The concurrent_tasks parameter controls the maximum number of concurrent tasks for a given task worker. This parameter is important when tasks are asynchronous, that is when they perform some sort of I/O and the job callable returns and asynchronous component.

    It can be specified in the command line via the --concurrent-tasks ... option.

    Default: 5.

Jobs

A task queue application implements several Job classes which specify the way a Task is run. Each Job class is a Task factory, therefore, a Task is always associated with one Job, which can be of two types:

  • standard (Job)
  • periodic (PeriodicJob), a generator of scheduled tasks.

Job callable method

To define a job is simple, subclass from Job and implement the job callable method:

from pulsar.apps import tasks

class Addition(tasks.Job):

    def __call__(self, consumer, a=0, b=0):
        "Add two numbers"
        return a+b

The consumer, instance of TaskConsumer, is passed by the Task backend and should always be the first positional parameter in the callable method. The remaining (optional key-valued only!) parameters are needed by your job implementation.

A job callable can also return a coroutine if it needs to perform asynchronous IO during its execution:

class Crawler(tasks.Job):

    def __call__(self, consumer, sample=100, size=10):
        response = yield http.request(...)
        content = response.content
        ...

This allows for cooperative task execution.

Non overlapping Jobs

The can_overlap attribute controls the way tasks are generated by a specific Job. By default, a Job creates a new task every time the TaskBackend requests it.

However, when setting the can_overlap attribute to False, a new task cannot be started unless a previous task of the same job is done.

Task Backend

The TaskBackend is at the heart of the task queue application. It exposes all the functionalities for running new tasks, scheduling periodic tasks and retrieving task information. Pulsar ships with two backends, one which uses pulsar internals and store tasks in the arbiter domain and another which stores tasks in redis.

The backend is created by the TaskQueue as soon as it starts. It is then passed to all task queue workers which, in turns, invoke the TaskBackend.start method to start pulling tasks form the distributed task queue.

Task states

A Task can have one of the following status string:

  • QUEUED = 6 A task queued but not yet executed.
  • STARTED = 5 task where execution has started.
  • RETRY = 4 A task is retrying calculation.
  • REVOKED = 3 the task execution has been revoked (or timed-out).
  • FAILURE = 2 task execution has finished with failure.
  • SUCCESS = 1 task execution has finished with success.

FULL_RUN_STATES

The set of states for which a Task has run: FAILURE and SUCCESS

READY_STATES

The set of states for which a Task has finished: REVOKED, FAILURE and SUCCESS

Task status broadcasting

A TaskBackend broadcast Task state into three different channels via the a pubsub() handler.

Implementation

When creating a new TaskBackend there are three methods which must be implemented:

For example:

from pulsar.apps import tasks

class TaskBackend(tasks.TaskBackend):
    ...

Once the custom task backend is implemented it must be registered:

tasks.task_backends['mybackend'] = TaskBackend

And the backend will be selected via:

--task-backend mybackend://host:port

API

List of all classes used by this application.

Task queue application

class pulsar.apps.tasks.TaskQueue(callable=None, load_config=True, **params)[source]

A pulsar Application for consuming Task.

This application can also schedule periodic tasks when the schedule_periodic flag is True.

backend = None

The TaskBackend for this task queue.

Available once the TaskQueue has started.

monitor_start(monitor)[source]

Starts running the task queue in monitor.

It calls the Application.callable (if available) and create the backend.

monitor_task(monitor)[source]

Override the monitor_task() callback.

Check if the backend needs to schedule new tasks.

Job class

class pulsar.apps.tasks.models.Job[source]

The Job class which is used in a distributed task queue.

name

The unique name which defines the Job and which can be used to retrieve it from the job registry. This attribute is set to the Job class name in lower case by default, unless a name class attribute is defined.

abstract

If set to True (default is False), the Job won’t be registered with the JobRegistry. Useful when creating a new base class for several other jobs.

type

Type of Job, one of regular and periodic.

timeout

An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.

Default: None.

can_overlap

Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.

Default: True.

doc_syntax

The doc string syntax.

Default: markdown

logger

an instance of a logger. Created at runtime.

type[source]

Type of Job, one of regular and periodic.

queue_task(consumer, jobname, meta_params=None, **kwargs)[source]

Queue a new task in the task queue.

This utility method can be used from within the job callable method and it allows tasks to act as tasks factories.

Parameters:
  • consumer – the TaskConsumer handling the Task. Must be the same instance as the one passed to the job callable method.
  • jobname – The name of the Job to run.
  • kwargs – key-valued parameters for the job callable.
Returns:

a Future called back with the task id.

This method invokes the TaskBackend.queue_task() method with the additional from_task argument equal to the id of the task invoking the method.

Periodic job

class pulsar.apps.tasks.models.PeriodicJob(run_every=None)[source]

A periodic Job implementation.

anchor = None

If specified it must be a datetime instance. It controls when the periodic Job is run.

run_every = None

Periodicity as a timedelta instance.

is_due(last_run_at)[source]

Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. For example:

  • (True, 20), means the job should be run now, and the next time to run is in 20 seconds.
  • (False, 12), means the job should be run in 12 seconds.

You can override this to decide the interval at runtime.

Job registry

class pulsar.apps.tasks.models.JobRegistry[source]

Site registry for tasks.

regular()[source]

A generator of all regular jobs.

periodic()[source]

A generator of all periodic jobs.

register(job)[source]

Register a job in the job registry.

The task will be automatically instantiated if not already an instance.

filter_types(type)[source]

Return a generator of all tasks of a specific type.

Task

class pulsar.apps.tasks.backend.Task(*args, **kwargs)[source]

A data Model containing task execution data.

id = None

Task unique identifier.

time_ended = None

The timestamp indicating when this has finished.

expiry = None

The timestamp indicating when this task expires.

If the task is not started before this value it is REVOKED.

status = None

flag indicating the task status

done()[source]

Return True if the Task has finshed.

Its status is one of READY_STATES.

status_string()[source]

A string representation of status code

TaskBackend

class pulsar.apps.tasks.backend.TaskBackend(store, logger=None, task_paths=None, schedule_periodic=False, backlog=1, max_tasks=0, name=None, poll_timeout=None)[source]

A backend class for running Task. A TaskBackend is responsible for creating tasks and put them into the distributed queue. It also schedules the run of periodic tasks if enabled to do so.

task_paths

List of paths where to upload jobs which are factory of tasks. Passed by the task-queue application task paths setting.

schedule_periodic

True if this TaskBackend can schedule periodic tasks.

Passed by the task-queue application schedule-periodic setting.

backlog

The maximum number of concurrent tasks running on a task-queue for an Actor. A number in the order of 5 to 10 is normally used. Passed by the task-queue application concurrent tasks setting.

max_tasks

The maximum number of tasks a worker will process before restarting. Passed by the task-queue application max requests setting.

poll_timeout

The (asynchronous) timeout for polling tasks from the task queue.

It is always a positive number and it can be specified via the backend connection string:

local://?poll_timeout=3

There shouldn’t be any reason to modify the default value.

Default: 2.

processed

The number of tasks processed (so far) by the worker running this backend. This value is important in connection with the max_tasks attribute.

num_concurrent_tasks[source]

The number of concurrent_tasks.

This number is never greater than the backlog attribute.

registry[source]

The JobRegistry for this backend.

channel(name)[source]

Given an event name returns the corresponding channel name.

The event name is one of task_queued, task_started or task_done

queue_task(*args, **kwargs)[source]

Try to queue a new Task.

This method returns a Future which results in the task id created. If jobname is not a valid Job.name, a TaskNotAvailable exception occurs.

Parameters:
  • jobname – the name of a Job registered with the TaskQueue application.
  • meta_params – Additional parameters to be passed to the Task constructor (not its callable function).
  • expiry – optional expiry timestamp to override the default expiry of a task.
  • kwargs – optional dictionary used for the key-valued arguments in the task callable.
Returns:

a Future resulting in a task id on success.

wait_for_task(task_id, timeout=None)[source]

Asynchronously wait for a task with task_id to have finished its execution.

get_pubsub()[source]

Create a publish/subscribe handler from the backend store.

maybe_queue_task(task)[source]

Actually queue a Task if possible.

get_task(task_id=None)[source]

Asynchronously retrieve a Task from a task_id.

Parameters:task_id – the id of the task to retrieve.
Returns:a Task or None.
finish_task(task_id, lock_id)[source]

Invoked at the end of task execution.

The Task with task_id has been executed (either successfully or not) or has been revoked. This method perform backend specific operations.

Must be implemented by subclasses.

flush()[source]

Remove all queued Task

start(worker)[source]

Invoked by the task queue worker when it starts.

close()[source]

Close this TaskBackend.

Invoked by the Actor when stopping.

generate_task_ids(job, kwargs)[source]

An internal method to generate task unique identifiers.

Parameters:
  • job – The Job creating the task.
  • kwargs – dictionary of key-valued parameters passed to the job callable method.
Returns:

a two-elements tuple containing the unique id and an identifier for overlapping tasks if the Job.can_overlap results in False.

Called by the TaskBackend when creating a new task.

TaskConsumer

class pulsar.apps.tasks.backend.TaskConsumer(backend, worker, task_id, job)[source]

A context manager for consuming tasks.

Instances of this consumer are created by the TaskBackend when a task is executed.

_loop

the queue-based loop of the thread executing the task.

task_id

the Task.id being consumed.

job

the Job which generated the task.

worker

the Actor executing the task.

backend

The TaskBackend. This is useful when creating tasks from within a job callable.

Scheduler Entry

class pulsar.apps.tasks.backend.SchedulerEntry(name, run_every, anchor=None)[source]

A class used as a schedule entry by the TaskBackend.

name

Task name

run_every

Interval in seconds

anchor

Datetime anchor

last_run_at

last run datetime

total_run_count

Total number of times this periodic task has been executed by the TaskBackend.

scheduled_last_run_at[source]

The scheduled last run datetime.

This is different from last_run_at only when anchor is set.

next(now=None)[source]

Increase the total_run_count attribute by one and set the value of last_run_at to now.

is_due(now=None)[source]

Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.

See unuk.contrib.tasks.models.PeriodicTask.is_due() for more information.

TaskQueue Rpc Mixin

The TaskQueue application does not expose an external API to run new tasks or retrieve task information. The TaskQueueRpcMixin class can be used to achieve just that.

It is a JSONRPC handler which exposes six functions for executing tasks and retrieving task information.

The task-queue example shows how to use this class in the context of a WSGI server running along side the task-queue application.

class pulsar.apps.tasks.rpc.TaskQueueRpcMixin(taskqueue, **kwargs)[source]

A JSONRPC mixin for communicating with a TaskQueue.

To use it, you need to have an RPC application and a task queue application installed in the Arbiter.

Parameters:taskqueue – instance or name of the TaskQueue application which exposes the remote procedure calls.
rpc_job_list(request, jobnames=None)[source]

Return the list of Jobs registered with task queue with meta information.

If a list of jobnames is given, it returns only jobs included in the list.

rpc_queue_task(request, jobname=None, **kw)[source]

Queue a new jobname in the task queue.

The task can be of any type as long as it is registered in the task queue registry. To check the available tasks call the rpc_job_list() function.

It returns the task id.

rpc_get_task(request, id=None)[source]

Retrieve a task from its id

rpc_get_tasks(request, **filters)[source]

Retrieve a list of tasks which satisfy key-valued filters

rpc_wait_for_task(request, id=None, timeout=None)[source]

Wait for a task to have finished.

Parameters:
  • id – the id of the task to wait for.
  • timeout – optional timeout in seconds.
Returns:

the json representation of the task once it has finished.

rpc_num_tasks(request)[source]

Return the approximate number of tasks in the task queue.

task_request_parameters(request)[source]

Internal function which returns a dictionary of parameters to be passed to the Task class constructor.

This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.



Table Of Contents

Previous topic

Available Clients

Next topic

Test Suite

This Page