Documentation for pulsar's DEVELOPMENT version. Get the release docs here.
Pulsar ships with an asynchronous TaskQueue for distributing work across threads/processes or machines. It is highly customisable, it can run in multi-threading or multiprocessing (default) mode and can share Task across several machines. By creating Job classes in a similar way you do for celery, this application gives you all you need for running them with very little setup effort:
from pulsar.apps import tasks if __name__ == '__main__': tasks.TaskQueue(tasks_path=['path.to.tasks.*']).start()
Check the task queue tutorial for a running example with simple tasks.
To get started, follow the these simple guidelines:
task_paths = ['myjobs','another.moduledir.*']
The * at the end of the second module indicates to collect Job from all submodules of another.moduledir.
It can be specified in the command line via the --schedule-periodic flag.
The task_backend parameter is a url type string which specifies the task backend to use.
It can be specified in the command line via the --task-backend ... option.
The concurrent_tasks parameter controls the maximum number of concurrent tasks for a given task worker. This parameter is important when tasks are asynchronous, that is when they perform some sort of I/O and the job callable returns and asynchronous component.
It can be specified in the command line via the --concurrent-tasks ... option.
A Application for consuming task.Tasks.
This application can also schedule periodic tasks when the schedule_periodic flag is True.
The TaskBackend for this task queue.
Starts running the task queue in monitor.
To define a job is simple, subclass from Job and implement the job callable method:
from pulsar.apps import tasks class Addition(tasks.Job): def __call__(self, consumer, a, b): "Add two numbers" return a+b
The consumer, instance of TaskConsumer, is passed by the Task backend and should always be the first positional parameter in the callable method. The remaining (optional) positional and key-valued parameters are needed by your job implementation.
class Crawler(tasks.Job): def __call__(self, consumer, sample, size=10): response = yield http.request(...) content = response.content ...
This allows for cooperative task execution on each task thread workers.
However, when setting the can_overlap attribute to False, a new task cannot be started unless a previous task of the same job is done.
The TaskBackend is at the heart of the task queue application. It exposes all the functionalities for running new tasks, scheduling periodic tasks and retrieving task information. Pulsar ships with two backends, one which uses pulsar internals and store tasks in the arbiter domain and another which stores tasks in redis.
The backend is created by the TaskQueue as soon as it starts. It is then passed to all task queue workers which, in turns, invoke the TaskBackend.start method to start pulling tasks form the distributed task queue.
When creating a new TaskBackend there are six methods which must be implemented:
The set of states for which a Task has run: FAILURE and SUCCESS
The set of states for which a Task has finished: REVOKED, FAILURE and SUCCESS
The Job class which is used in a distributed task queue.
The unique name which defines the Job and which can be used to retrieve it from the job registry. This attribute is set to the Job class name in lower case by default, unless a name class attribute is defined.
If set to True (default is False), the Job won’t be registered with the JobRegistry. Useful when creating a new base class for several other jobs.
Type of Job, one of regular and periodic.
An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.
Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.
The doc string syntax.
an instance of a logger. Created at runtime.
Type of Job, one of regular and periodic.
Queue a new task in the task queue.
This utility method can be used from within the job callable method and it allows tasks to act as tasks factories.
a Deferred called back with the task id.
This method invokes the TaskBackend.queue_task() method with the additional from_task argument equal to the id of the task invoking the method.
A periodic Job implementation.
If specified it must be a datetime.datetime instance. It controls when the periodic Job is run.
Periodicity as a datetime.timedelta instance.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. e.g.
time to run is in 20 seconds.
(False, 12), means the Job should be run in 12 seconds.
You can override this to decide the interval at runtime.
Site registry for tasks.
A generator of all regular jobs.
A generator of all periodic jobs.
Register a job in the job registry.
The task will be automatically instantiated if not already an instance.
Return a generator of all tasks of a specific type.
A model containing task execution data.
Task unique identifier.
The timestamp indicating when this has finished.
The timestamp indicating when this task expires.
If the task is not started before this value it is REVOKED.
True if this TaskBackend can schedule periodic tasks.
Passed by the task-queue application schedule-periodic setting.
The maximum number of tasks a worker will process before restarting. Passed by the task-queue application max requests setting.
The (asynchronous) timeout for polling tasks from the task queue.
It is always a positive number and it can be specified via the backend connection string:
There shouldn’t be any reason to modify the default value.
The number of tasks processed (so far) by the worker running this backend. This value is important in connection with the max_tasks attribute.
The number of concurrent_tasks.
This number is never greater than the backlog attribute.
Try to queue a new Task.
a Deferred resulting in a task id on success.
Asynchronously wait for a task with task_id to have finished its execution.
Actually queue a task if possible.
Asynchronously retrieve a Task from a task_id.
a Task or None.
The publish/subscribe handler.
invoked by the task queue worker when it starts.
Here, the worker creates its thread pool via Actor.create_thread_pool() and register the may_pool_task() callback in its event loop.
An internal method to generate task unique identifiers.
a two-elements tuple containing the unique id and an identifier for overlapping tasks if the can_overlap results in False.
Called by the TaskBackend when creating a new task.
A context manager for consuming tasks.
Instances of this consumer are created by the TaskBackend when a task is executed.
the Job which generated the task.
A class used as a schedule entry by the TaskBackend.
Interval in seconds
last run datetime
The scheduled last run datetime.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.
See unuk.contrib.tasks.models.PeriodicTask.is_due() for more information.
It is a JSONRPC handler which exposes six functions for executing tasks and retrieving task information.
The task-queue example shows how to use this class in the context of a WSGI server running along side the task-queue application.
|Parameters:||taskqueue – instance or name of the TaskQueue application which exposes the remote procedure calls.|
Return the list of Jobs registered with task queue with meta information.
If a list of jobnames is given, it returns only jobs included in the list.
Queue a new jobname in the task queue.
The task can be of any type as long as it is registered in the task queue registry. To check the available tasks call the rpc_job_list() function.
It returns the task id.
Retrieve a task from its id
Retrieve a list of tasks which satisfy key-valued filters
Wait for a task to have finished.
the json representation of the task once it has finished.
Return the approximate number of tasks in the task queue.
Internal function which returns a dictionary of parameters to be passed to the Task class constructor.
This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.