0

After many tests and searches, I did not get the result and I hope you can guide me.
My codes are available at this GitHub address.
Due to the complexity of the main codes, I have written a simple code example with the same problem and linked it at the above address.
I have a worker that contains four app.tasks with the following names:

  • app_1000
  • app_1002
  • app_1004
  • app_1006

And each of the app.tasks should be executed simultaneously only once, that is, for example, app_1000 should not be executed two or three times at the same time and should be executed only once at a time, and if the current task of app_1000 is finished, it can go to Go to the next job.
Celery config:

broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1

task_routes={
    'celery_app.tasks.app_1000':{'queue':'q_app'},
    'celery_app.tasks.app_1002':{'queue':'q_app'},
    'celery_app.tasks.app_1004':{'queue':'q_app'},
    'celery_app.tasks.app_1006':{'queue':'q_app'},
    'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}

As you can see, worker_prefetch_multiplier = 1 is in the above configuration.


I use fastapi to send the request and the sample request is as follows (to simplify the question, I only send the number of tasks that must be executed by this worker through fastapi)

enter image description here


I also use the flower script to check the tasks.
After pressing the Send button in Postman, all these 20 hypothetical tasks are sent to the Worker, and at first everything is fine because each app.tasks has started a task.
enter image description here


But a few minutes later, when things go forward, the app.tasks are executed simultaneously, that is, for example, according to the photo, app_1000 has been started twice, or in the next photo, app_1006 has been started twice and they are running simultaneously, and I do not intend to do this. case to occur.
enter image description here
A few moments later: enter image description here


I expect app_1000 or app_1006 to do only one thing at a time, but I don't know how to do it.

Important note: Please do not suggest creating 4 queues for 4 app.tasks because in my real project I have more than 100 app.tasks and it is very difficult to manage all these queues.
A question may arise, why, for example, app_1000 should not be executed simultaneously? The answer to this question is very complicated and we have to explain the main codes which are too many, so please skip this question.
The codes are in GitHub (the volume of the codes is small and will not take much of your time) And if you want to run it, you can enter the following commands:

celery -A celery_app  worker -Q q_app --loglevel=INFO --concurrency=4 -n  worker@%h 
celery flower --port=5566
uvivorn api:app --reload

thank you
Sardar
  • 524
  • 1
  • 6
  • 19

2 Answers2

1

Unfortunately, celery doesn't provide any out of the box solution to this. You have to implement a distributed cache locking mechanism and check before executing the task. Similar question and related answers are present here.

Lemon Reddy
  • 553
  • 3
  • 5
  • Seems like it's possible. Checkout this. I haven't used airflow. https://stackoverflow.com/questions/51600181/airflow-locking-between-tasks-so-that-only-one-parallel-task-runs-at-a-time – Lemon Reddy Aug 09 '23 at 16:10
-1

you can try to use a lock inside the task, something like:

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def app_1000(self, arg):
    # Using lock to ensure there's no other task (you can include args on lock_id)
    lock_id = 'lock-1000'
    with cache_lock(lock_id, self.app.oid) as acquired:

        # Skip if there's another task
        if not acquired:
            logger.debug("Skip task: There's another task doing the same.")
            return False

        # your task here ...

where cache_lock is:

from django.core.cache import cache


@contextmanager
def cache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

Rafen
  • 138
  • 1
  • 3
  • Thanks, I saw a sample code like this in the Celery documentation, but this method does not work for me and we do not use Django. – Sardar Aug 09 '23 at 16:54