1

I'm running multiple simulations as tasks through celery (version 2.3.2) from django. The simulations get set up by another task:

In views.py:

result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id

In tasks.py:

@task(priority=1)
def setup_simulations(parameters):
    task_ids = []
    for i in range(number_of_simulations):
        result = run_simulation.delay(other_parameters)
        task_ids.append(result.task_id)
    return task_ids

After the initial task (setup_simulations) has finished, I try to revoke the simulation tasks as follows:

main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
    sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
    # revoke(sub_task_id) # Does not work neither

When I look at the output from "python manage.py celeryd -l info", the tasks get executed as if nothing had happened. Any ideas somebody what could have gone wrong?

Meilo
  • 3,378
  • 3
  • 28
  • 34
  • Could the problem depend on the fact that I use Kombu as broker? (These “virtual transports” may have limited broadcast and event functionality. For example remote control commands only works with AMQP and Redis.) – Meilo Sep 14 '11 at 15:31

1 Answers1

1

As you mention in the comment, revoke is a remote control command so it's only currently supported by the amqp and redis transports.

You can accomplish this yourself by storing a revoked flag in your database, e.g:

from celery import states
from celery import task
from celery.exceptions import Ignore

from myapp.models import RevokedTasks


@task
def foo():
    if RevokedTasks.objects.filter(task_id=foo.request.id).count():
        if not foo.ignore_result:
            foo.update_state(state=states.REVOKED)
        raise Ignore()

If your task is working on some model you could even store a flag in that.

asksol
  • 19,129
  • 5
  • 61
  • 68
  • Thank you for confirming my suspicion about Kombu. I was not sure if that was the case, because Kombu is indeed an AMQP messaging framework. Thank you very much for your proposal as well, I guess I will go with that solution then! – Meilo Sep 15 '11 at 10:41
  • Is there a way to revoke a previously scheduled tasks with this workaround? – Fitoria Sep 27 '12 at 16:26
  • I updated the example to use `Ignore`, which is new in Celery 3.0.11. This makes to worker ignore the task after it returns and thus leave the state of the task alone. – asksol Sep 29 '12 at 10:48
  • @Fitoria What do you mean by previously scheduled? Any task the worker receives can be ignored this way. You can use the same technique to e.g. change the ETA/countdown of a task after it's been sent, by Ignor'ing the task and sending a new task with a new ETA. (it doesn't help if the task is already executing of course). – asksol Sep 29 '12 at 10:49
  • @asksol What's the best way to add a field on celery task table? http://stackoverflow.com/questions/25046200/celery-revoke-task-before-execute-using-django-database – Pietro Aug 02 '14 at 09:11