Celery primitives notes
Defining workflows
Celery workflows
The main three primitives are:
- Chain
- Group
- Chord
Chain
A chain is a list of tasks which are processed one after another. Properties:
- As soon as one task fails the chain is cancelled.
- There is no limit to how many tasks form a chain.
- A callback can be added as a new task to a chain.
If the first task of a chain consists of a group, the chain is converted into a chord.
Syntax for creating task chains.
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
# Creating a chain with two tasks
add.apply_async((2, 2), link=other_task.s())
# Which the same as
s = chain(add(2,2) | other_task.s()).apply_async()
# Create a chain with three tasks
s = add.s(2, 2)
s.link(mul.s(4))
s.link(log_result.s())
# Which is the same as
s = chain(add.s(2,2) | mul.s(4) | log_result.s())
Chain error callbacks
Error callback tasks can be added to a chain on two different ways:
add.s(2, 2).on_error(log_error.s()).delay()
# Which is the same as
add.apply_async((2, 2), link_error=log_error.s())
Group
A group is a set of tasks which are executed in parallel.
from celery import group
group(add.s(2, 2), add.s(4, 4))
Group error callback
Group error callbacks are linked to each task of the group. Therefore, each failing task will call the error handling task.
It is recommended to use chords to handle group error callbacks.
Chord
This structure consists of a group of tasks followed up by a task which runs after all tasks in the group have finished successfully.
Example:
callback_task = report_results.s()
group_tasks = [add.s(i, i) for i in range(100)]
result = chord(group_tasks)(callback_task)
result.get()
# Which is the same as
result = (group(add.s(i ,i) for i in range(100)) | callback_task.s())
result.get()
The handler returned by the chord is related to the callback task, so once it returns all the tasks in the chord are finished. The callback task will received a list of all return values from the group tasks as an argument.