0

In all of the similar problems I've seen on stackOverflow:

the error states the name of the task that's unregistered. I'm having a different issue. The name of the task isn't being displayed, but rather Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, resulting in KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.

Here's my traceback:

KeyError: <AsyncResult: 4aca05f8-14c6-4a25-988a-ff605a27871d>
[2016-06-15 14:11:46,016: ERROR/MainProcess] Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': 'a6e8d1c0-c75b-471e-b21f-af8492592aeb', 'kwargs': {}, 'eta': None, 'id': '0dffed5f-3090-417c-a9ec-c99e11bc9579'} (568b)
Traceback (most recent call last):
File "/Users/me/Developer/virtualenvironments/project_name/lib/python2.7/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>

My celery app is including the file where my only 3 tasks are:

app/celery_app.py:

celery_app = Celery('app',
         broker='amqp://ip',  # RabbitMQ
         backend='redis://ip', #Redis
         include=['app.tasks.assets'])

celery_app.conf.update(
CELERY_DEFAULT_QUEUE = 'local_testing',
CELERY_TASK_RESULT_EXPIRES=86400,  # 24 hours
CELERY_ROUTES={
    'app.tasks.assets.list_assets': {'queue': 'gatherAPI'},
    'app.tasks.assets.massage_assets': {'queue':'computation'},
    'app.tasks.assets.save_assets': {'queue':'database_writes'},
}

)

app/tasks/assets.py:

from __future__ import absolute_import
from celery import current_app

@current_app.task(name='app.tasks.assets.list_assets')
def list_assets(*args, **kwargs):
    print "list assets"

@current_app.task(name='app.tasks.assets.massage_assets')
def massage_assets(assets):
    print "massaging assets"

@current_app.task(name='app.tasks.assets.save_assets', ignore_result=True)
def save_assets(assets):
    print "saving assets..."

These errors occur only in the queues "celery" (which I'm not using) and "local_testing".

The appropriate queues for all of these tasks print out and work as intended, but somehow, the queues named "celery" and "local_testing" are filling up (same queue size) and spitting out nothing but this traceback over and over again.

Here's how I'm calling the tasks...

app/processes/processes.py:

from celery import group
class Process(object):
    def run_process(self, resource_generator, chain_signature):
        tasks = []
        for resources in resource_generator:
            tasks.append(chain_signature(resources))
        group(tasks)()

app/processes/assets.py:

from __future__ import absolute_import

from app.processes.processes import Process
from app.indexes.asset import AssetIndex
from app.tasks.assets import *

class AssetProcess(Process):
    def run(self):
        Process.run_process(self,
                            resource_generator=AssetIndex.asset_generator(),
                            chain_signature=(
                                list_assets.s() | 
                                massage_assets.s() | 
                                save_assets.s()))

Again, the default queue is set to "local_testing", so I'm not sure how anything's being piped to the "celery" queue. The traceback I'm getting is also fairly unhelpful.

I'm launching the celery worker (with the "celery" queue, or with the local_testing queue (-Q local_testing)) from the directory above app/, like so:

celery -A app.celery_app worker -l info -n worker3.%h

Any help is greatly appreciated.

Cheers!

Community
  • 1
  • 1
Nick Knuckle
  • 106
  • 10

1 Answers1

1

I've determined the problem, and it's from using group.

By passing the chain signature an argument, it's automatically applied asynchronously. By using group, I'm grouping the asyncResult object, which doesn't make any sense. I've altered the execution thusly:

def run_process(self, resource_generator, chain_signature):
    for resources in resource_generator:
        chain_signature(resources)

This effectively does what I wanted anyway.

Cheers

Nick Knuckle
  • 106
  • 10