Note
- Please read and understand the question thoroughly
- It cannot be solved by simple
BranchPythonOperator/ShortCircuitOperator
We have an unusual multiplexer-like use-case in our workflow
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
The flow is expected to work as follows
MUX-tasklistens for events on an external queue (single queue)- each event on queue triggers execution of one of the branches (branch-n.begin-task)
- one-by-one, as events arrive, the MUX-task must trigger execution of respective branch
- once all branches have been triggered, the MUX-task completes
Assumptions
- Exactly
nevents arrive on queue, one for triggering each branch nis dynamically-known: it's value is defined in aVariable
Limitations
- The external queue where events arrive is only one
- we can't have
nqueues (one per branch) since branches grow with time (n is dynamically defined)
We are not able to come up with a solution within Airflow's set of operators and sensors (or any such thing available out-of-the-hood in Airflow) to build this
Sensors can be used for listening events on external queue; but we have to listen for multiple events, not oneBranchPythonOperatorcan be used to trigger execution of a single branch out of many, but it immediately marks remaining branches as skipped
Primary bottleneck
Because of the 2nd limitation above, even a custom-operator combining functionality of a Sensor and BranchPythonOperator won't work.
We have tried to brainstorm around a fancy combination of Sensors, DummyOperator and trigger_rules to achieve this, but have had no success thus far.
Is this doable in Airflow?
UPDATE-1
Here's some background info to understand the context of workflow
- we have an ETL pipeline to sync
MySQLtables (across multipleAuroradatabases) to our data-lake - to overcome the impact of our sync pipeline on production databases, we have decided to do this
- for each database, create a snapshot (restore
AuroraDBcluster from last backup) - run
MySQLsync pipeline using that snapshot - at then end of sync, terminate the snapshot (
AuroraDBcluster)
- for each database, create a snapshot (restore
- the snapshot lifecycle events of
Aurorasnapshot restore process are published to anSQSqueue- single queue for all databases
- this setup was done by our DevOps team (different AWS account, we don't have access to the underlying
Lambdas /SQS/ infra)