In all of the similar problems I've seen on stackOverflow:
- Celery Received unregistered task of type (run example)
- getting error Received unregistered task of type 'mytasks.add'
- Received unregistered task for celery
- https://serverfault.com/questions/416888/celery-daemon-receives-unregistered-tasks
- https://github.com/duointeractive/sea-cucumber/issues/15
the error states the name of the task that's unregistered. I'm having a different issue. The name of the task isn't being displayed, but rather Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, resulting in KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
Here's my traceback:
KeyError: <AsyncResult: 4aca05f8-14c6-4a25-988a-ff605a27871d>
[2016-06-15 14:11:46,016: ERROR/MainProcess] Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': 'a6e8d1c0-c75b-471e-b21f-af8492592aeb', 'kwargs': {}, 'eta': None, 'id': '0dffed5f-3090-417c-a9ec-c99e11bc9579'} (568b)
Traceback (most recent call last):
File "/Users/me/Developer/virtualenvironments/project_name/lib/python2.7/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>
My celery app is including the file where my only 3 tasks are:
app/celery_app.py:
celery_app = Celery('app',
broker='amqp://ip', # RabbitMQ
backend='redis://ip', #Redis
include=['app.tasks.assets'])
celery_app.conf.update(
CELERY_DEFAULT_QUEUE = 'local_testing',
CELERY_TASK_RESULT_EXPIRES=86400, # 24 hours
CELERY_ROUTES={
'app.tasks.assets.list_assets': {'queue': 'gatherAPI'},
'app.tasks.assets.massage_assets': {'queue':'computation'},
'app.tasks.assets.save_assets': {'queue':'database_writes'},
}
)
app/tasks/assets.py:
from __future__ import absolute_import
from celery import current_app
@current_app.task(name='app.tasks.assets.list_assets')
def list_assets(*args, **kwargs):
print "list assets"
@current_app.task(name='app.tasks.assets.massage_assets')
def massage_assets(assets):
print "massaging assets"
@current_app.task(name='app.tasks.assets.save_assets', ignore_result=True)
def save_assets(assets):
print "saving assets..."
These errors occur only in the queues "celery" (which I'm not using) and "local_testing".
The appropriate queues for all of these tasks print out and work as intended, but somehow, the queues named "celery" and "local_testing" are filling up (same queue size) and spitting out nothing but this traceback over and over again.
Here's how I'm calling the tasks...
app/processes/processes.py:
from celery import group
class Process(object):
def run_process(self, resource_generator, chain_signature):
tasks = []
for resources in resource_generator:
tasks.append(chain_signature(resources))
group(tasks)()
app/processes/assets.py:
from __future__ import absolute_import
from app.processes.processes import Process
from app.indexes.asset import AssetIndex
from app.tasks.assets import *
class AssetProcess(Process):
def run(self):
Process.run_process(self,
resource_generator=AssetIndex.asset_generator(),
chain_signature=(
list_assets.s() |
massage_assets.s() |
save_assets.s()))
Again, the default queue is set to "local_testing", so I'm not sure how anything's being piped to the "celery" queue. The traceback I'm getting is also fairly unhelpful.
I'm launching the celery worker (with the "celery" queue, or with the local_testing queue (-Q local_testing)) from the directory above app/, like so:
celery -A app.celery_app worker -l info -n worker3.%h
Any help is greatly appreciated.
Cheers!