1

I would like to define some tasks that can be triggered dependent on messages from a broker (e.g. SQS). These tasks would be able to re-publish messages triggering other tasks.

While this is partially solved solved by libraries like celery/selinon, one extra requirement I have is that I would like to debounce certain expensive tasks.

Here is an example of how I would want to define this:

@task(event_type="Task1")
def task1(message: Dict):
  # Queue Task2 to be called
  publish({"type": "Task2"})
  return inexpensive_operation(message)

@task(event_type="Task2")
def task2(message: Dict):
  return inexpensive_operation(message)

@task(event_type="Task3" run_after=["Task1", "Task2"], debounce=1000)
def task3(messages: List[Dict]):
  return expensive(messages)

In the above code, Task3 runs after either Task1 or Task2 has finished. If Task1 and Task2 get called at the same time, rather than Task3 being called potentially three times, I would instead like Task3 to be called once, but be passed the various messages that triggered its run.

Lastly, these tasks may be called on distributed processes potentially requiring some coordinating DB.

I know I could probably implement my own debouncing consumer using a DB with atomic writes, but this feels like a non-trivial problem with many edge cases.

Are there any libraries that help coordinate these more complex dependency scenarios?

rbhalla
  • 869
  • 8
  • 32
  • Am not aware of any libraries specific to this use-case; I'd probably roll my own. However, I did find a couple references that might help, depending on how you want the "debounce" to work: https://stackoverflow.com/questions/12003221/celery-task-schedule-ensuring-a-task-is-only-executed-one-at-a-time has a couple options that might work for you. Also, Python's threading library ( https://docs.python.org/3/library/threading.html#lock-objects ) has several variants on the Lock class. You could also maintain a DB record / cache to accumulate params and execute after the "debounce" period. – Sarah Messer Mar 06 '23 at 23:09

1 Answers1

-1

You should be able to do this with Prefect. You can define your task decorator there as well as the dependencies. Leverage a debounce in your decorator to specify a minimum interval between task executions to ensure your expensive task is done once every specified interval while the debounced_task can be called more frequently.

A very basic and rough example of this using AWS would look like this:

import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.aws.sqs import SQSMessage
from datetime import timedelta


@task
def publish_task_message(type: str):
    # Publish a message to the SQS queue 
    sqs_message = SQSMessage(
        queue_url="< your queue url here >",
        message_body={"type": type},
    )
    sqs_message.run()


@task
def task1(message: dict):
    # Queue Task2 to be called
    publish_task_message("Task2")
    return "Task1 output"


@task
def task2(message: dict):
    return "Task2 output"


@task(debounce=timedelta(seconds=5))
def task3(messages: list):
    # This task will only execute if it has not been called in the last 5 seconds
    return "Task3 output"


with Flow("my-flow") as flow:
    # Define the flow
    message = Parameter("message")
    task1_output = task1(message)
    task2_output = task2.map(task1_output)
    task3_input = task1_output | task2_output
    task3_output = task3(task3_input)

flow.run(parameters={"message": {"type": "Task1"}})

task1 and task2 are triggered by messages from the SQS queue, while task3 is triggered by the output of task1 and task2.

Tendekai Muchenje
  • 440
  • 1
  • 6
  • 20
  • I'm not too familiar with Prefect, but I can't seem to see any reference to `debounce` in their [docs](https://docs.prefect.io/api-ref/prefect/tasks/), am I missing it? Also, in my scenario, I would expect Task3 to be called after Task1 or Task2 are called (regardless of how they are called). I'm not sure how a `Flow` approach can address that in the same way? – rbhalla Mar 09 '23 at 00:02