I would like to define some tasks that can be triggered dependent on messages from a broker (e.g. SQS). These tasks would be able to re-publish messages triggering other tasks.
While this is partially solved solved by libraries like celery/selinon, one extra requirement I have is that I would like to debounce certain expensive tasks.
Here is an example of how I would want to define this:
@task(event_type="Task1")
def task1(message: Dict):
# Queue Task2 to be called
publish({"type": "Task2"})
return inexpensive_operation(message)
@task(event_type="Task2")
def task2(message: Dict):
return inexpensive_operation(message)
@task(event_type="Task3" run_after=["Task1", "Task2"], debounce=1000)
def task3(messages: List[Dict]):
return expensive(messages)
In the above code, Task3 runs after either Task1 or Task2 has finished. If Task1 and Task2 get called at the same time, rather than Task3 being called potentially three times, I would instead like Task3 to be called once, but be passed the various messages that triggered its run.
Lastly, these tasks may be called on distributed processes potentially requiring some coordinating DB.
I know I could probably implement my own debouncing consumer using a DB with atomic writes, but this feels like a non-trivial problem with many edge cases.
Are there any libraries that help coordinate these more complex dependency scenarios?