In my case, I had issues with the accepted answers:
- Raising Ignore()orReject()from within the task led to the task being correctly inFAILUREstate, but when running a Celery workflow (thinkchain,chord, orgroup) containing this failing task would always result in the workflow hanging:
workflow = chain(my_task.si([], *args, **kwargs), other_task.s(*args, **kwargs))
res = workflow()
results = res.get() # hangs as the workflow never enters the ready state
- I wanted the rest of the workflow to still run even if one of the tasks failed (not propagate exceptions, or have global error handlers that are difficult to work with)
So I ended up always marking the task as success and doing my own error post-processing after the workflow ends (always successfully):
import traceback
def my_task(prev, arg1, arg2, opts={}):
  results = []
  state = {
    'state': task_state,
    'meta': {
      'custom_attr': 'test',
      # task metadata as needed
    }
  }
  try:
    # task code goes here
    task_state = 'SUCCESS'
    task_exc = None
  except BaseException as exc:
    task_state = 'FAILURE'
    task_exc = exc
  finally:
    state['state'] = 'SUCCESS'
    if task_state == 'FAILURE':
      exc_str = ' '.join(traceback.format_exception(
        etype=type(task_exc),
        value=task_exc,
        tb=task_exc.__traceback__))
      state['meta']['error'] = exc_str
    # Update task state with final status
    self.update_state(**state)
    return results
This has the advantage of:
- Keeping all the needed exception data (traceback)
- Avoiding the weird Celery post-processing of task states:
- when tasks fail Celery replaces the dict data by the exception instance, which prevents having a consistent way of accessing task metadata
- updating the FAILUREstate without raisingIgnore()orReject()always result in task state beingSUCCESS...
 
- Make complex workflows way more resilient to failures.
This allows me to always process workflow results in the following way:
info = res.get() # this is always a dict containing metadata
error = info.get('error')
results = info['results']
custom_attr = info['custom_attr']