Celery primitives notes

Defining workflows

Celery workflows

The main three primitives are:

  • Chain
  • Group
  • Chord

Chain

A chain is a list of tasks which are processed one after another. Properties:

  • As soon as one task fails the chain is cancelled.
  • There is no limit to how many tasks form a chain.
  • A callback can be added as a new task to a chain.

If the first task of a chain consists of a group, the chain is converted into a chord.

Syntax for creating task chains.

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@app.task(base=MyTask)
def add(x, y):
    raise KeyError()

# Creating a chain with two tasks
add.apply_async((2, 2), link=other_task.s())
# Which the same as
s = chain(add(2,2) | other_task.s()).apply_async()

# Create a chain with three tasks
s = add.s(2, 2)
s.link(mul.s(4))
s.link(log_result.s())
# Which is the same as
s = chain(add.s(2,2) | mul.s(4) | log_result.s())

Chain error callbacks

Error callback tasks can be added to a chain on two different ways:

add.s(2, 2).on_error(log_error.s()).delay()
# Which is the same as
add.apply_async((2, 2), link_error=log_error.s())

Group

A group is a set of tasks which are executed in parallel.

from celery import group

group(add.s(2, 2), add.s(4, 4))

Group error callback

Group error callbacks are linked to each task of the group. Therefore, each failing task will call the error handling task.

It is recommended to use chords to handle group error callbacks.

Chord

This structure consists of a group of tasks followed up by a task which runs after all tasks in the group have finished successfully.

Example:

callback_task = report_results.s()
group_tasks = [add.s(i, i) for i in range(100)]
result = chord(group_tasks)(callback_task)
result.get()
# Which is the same as
result = (group(add.s(i ,i) for i in range(100)) | callback_task.s())
result.get()

The handler returned by the chord is related to the callback task, so once it returns all the tasks in the chord are finished. The callback task will received a list of all return values from the group tasks as an argument.