Documentation for pulsar's DEVELOPMENT version. Get the release docs here.
Pulsar ships with an asynchronous TaskQueue for distributing work across threads/processes or machines. It is highly customisable, it can run in multi-threading or multi-processing (default) mode and can share Task across several machines.
from pulsar.apps import tasks if __name__ == '__main__': tasks.TaskQueue(tasks_path=['path.to.tasks.*']).start()
Check the task queue tutorial for a running example with simple tasks.
To get started, follow these guidelines:
task_paths = ['myjobs','another.moduledir.*']
The * at the end of the second module indicates to collect Job from all submodules of another.moduledir.
It can be specified in the command line via the --schedule-periodic flag.
It can be specified in the command line via the --task-backend ... option.
The concurrent_tasks parameter controls the maximum number of concurrent tasks for a given task worker. This parameter is important when tasks are asynchronous, that is when they perform some sort of I/O and the job callable returns and asynchronous component.
It can be specified in the command line via the --concurrent-tasks ... option.
A task queue application implements several Job classes which specify the way a Task is run. Each Job class is a Task factory, therefore, a Task is always associated with one Job, which can be of two types:
To define a job is simple, subclass from Job and implement the job callable method:
from pulsar.apps import tasks class Addition(tasks.Job): def __call__(self, consumer, a=0, b=0): "Add two numbers" return a+b
The consumer, instance of TaskConsumer, is passed by the Task backend and should always be the first positional parameter in the callable method. The remaining (optional key-valued only!) parameters are needed by your job implementation.
class Crawler(tasks.Job): def __call__(self, consumer, sample=100, size=10): response = yield http.request(...) content = response.content ...
This allows for cooperative task execution.
However, when setting the can_overlap attribute to False, a new task cannot be started unless a previous task of the same job is done.
The TaskBackend is at the heart of the task queue application. It exposes all the functionalities for running new tasks, scheduling periodic tasks and retrieving task information. Pulsar ships with two backends, one which uses pulsar internals and store tasks in the arbiter domain and another which stores tasks in redis.
The backend is created by the TaskQueue as soon as it starts. It is then passed to all task queue workers which, in turns, invoke the TaskBackend.start method to start pulling tasks form the distributed task queue.
The set of states for which a Task has run: FAILURE and SUCCESS
The set of states for which a Task has finished: REVOKED, FAILURE and SUCCESS
When creating a new TaskBackend there are three methods which must be implemented:
from pulsar.apps import tasks class TaskBackend(tasks.TaskBackend): ...
Once the custom task backend is implemented it must be registered:
tasks.task_backends['mybackend'] = TaskBackend
And the backend will be selected via:
List of all classes used by this application.
This application can also schedule periodic tasks when the schedule_periodic flag is True.
Starts running the task queue in monitor.
The Job class which is used in a distributed task queue.
The unique name which defines the Job and which can be used to retrieve it from the job registry. This attribute is set to the Job class name in lower case by default, unless a name class attribute is defined.
Type of Job, one of regular and periodic.
An instance of a datetime.timedelta or None. If set, it represents the time lag after which a task which did not start expires.
Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.
The doc string syntax.
an instance of a logger. Created at runtime.
Type of Job, one of regular and periodic.
Queue a new task in the task queue.
This utility method can be used from within the job callable method and it allows tasks to act as tasks factories.
a Future called back with the task id.
This method invokes the TaskBackend.queue_task() method with the additional from_task argument equal to the id of the task invoking the method.
A periodic Job implementation.
If specified it must be a datetime instance. It controls when the periodic Job is run.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds. For example:
You can override this to decide the interval at runtime.
Site registry for tasks.
A generator of all regular jobs.
A generator of all periodic jobs.
Register a job in the job registry.
The task will be automatically instantiated if not already an instance.
Return a generator of all tasks of a specific type.
A data Model containing task execution data.
Task unique identifier.
The timestamp indicating when this has finished.
The timestamp indicating when this task expires.
If the task is not started before this value it is REVOKED.
True if this TaskBackend can schedule periodic tasks.
Passed by the task-queue application schedule-periodic setting.
The maximum number of tasks a worker will process before restarting. Passed by the task-queue application max requests setting.
The (asynchronous) timeout for polling tasks from the task queue.
It is always a positive number and it can be specified via the backend connection string:
There shouldn’t be any reason to modify the default value.
The number of tasks processed (so far) by the worker running this backend. This value is important in connection with the max_tasks attribute.
The number of concurrent_tasks.
This number is never greater than the backlog attribute.
Given an event name returns the corresponding channel name.
The event name is one of task_queued, task_started or task_done
Try to queue a new Task.
This method returns a Future which results in the task id created. If jobname is not a valid Job.name, a TaskNotAvailable exception occurs.
a Future resulting in a task id on success.
Asynchronously wait for a task with task_id to have finished its execution.
Create a publish/subscribe handler from the backend store.
Asynchronously retrieve a Task from a task_id.
|Parameters:||task_id – the id of the task to retrieve.|
|Returns:||a Task or None.|
Invoked at the end of task execution.
The Task with task_id has been executed (either successfully or not) or has been revoked. This method perform backend specific operations.
Must be implemented by subclasses.
Invoked by the task queue worker when it starts.
An internal method to generate task unique identifiers.
a two-elements tuple containing the unique id and an identifier for overlapping tasks if the Job.can_overlap results in False.
Called by the TaskBackend when creating a new task.
A context manager for consuming tasks.
Instances of this consumer are created by the TaskBackend when a task is executed.
the queue-based loop of the thread executing the task.
A class used as a schedule entry by the TaskBackend.
Interval in seconds
last run datetime
The scheduled last run datetime.
Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.
See unuk.contrib.tasks.models.PeriodicTask.is_due() for more information.
It is a JSONRPC handler which exposes six functions for executing tasks and retrieving task information.
The task-queue example shows how to use this class in the context of a WSGI server running along side the task-queue application.
|Parameters:||taskqueue – instance or name of the TaskQueue application which exposes the remote procedure calls.|
Return the list of Jobs registered with task queue with meta information.
If a list of jobnames is given, it returns only jobs included in the list.
Queue a new jobname in the task queue.
The task can be of any type as long as it is registered in the task queue registry. To check the available tasks call the rpc_job_list() function.
It returns the task id.
Retrieve a task from its id
Retrieve a list of tasks which satisfy key-valued filters
Wait for a task to have finished.
the json representation of the task once it has finished.
Return the approximate number of tasks in the task queue.
Internal function which returns a dictionary of parameters to be passed to the Task class constructor.
This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.