Hi Guys am new to airflow and python. I need to run the tasks based on the value of a variable in the input json. If the value of the variable 'insurance' is "true" then task1, task2, task3 need to run else task4, task5, task6 need to run. Since am a newbie to this i dont have much idea about the usage of PythonOperator & BranchPythonOperator.
This is my input json:
{
  "car": {
    "engine_no": "123_st_456",
    "json": "{\"make\":\"Honda\",\"model\": Jazz, \"insurance\":\"true\",\"pollution\":\"true\" }"
  }
}
The code is given below:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
import logging
import json
default_args = {
    'owner': 'airflow',
    'depends_on_past': False
}
dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))   
PythonOperator(
   task_id = 'sample_task',
   python_callable = 'sample_fun',
   op_kwargs = {
       json  : '{{ dag_run.car.json}}'
   },
   provide_context=True,
   dag = dag
)
def sample_fun( json,**kwargs):
  insurance_flag = json.dumps(json)['insurance']
task1 = BashOperator(
    task_id='task1',
    bash_command='echo 1'
)
task2 = BashOperator(
    task_id='task2',
    bash_command='echo 2'
)
task3 = BashOperator(
    task_id='task3',
    bash_command='echo 3'
) 
task4 = BashOperator(
    task_id='task4',
    bash_command='echo 4'
)  
task5 = BashOperator(
    task_id='task5',
    bash_command='echo 5'
)
task6 = BashOperator(
    task_id='task6',
    bash_command='echo 6'
) 
if insurance_flag == "true":
    task1.dag = dag
    task2.dag = dag
    task3.dag = dag
    task1 >> task2 >> task3
    
else:
    task4.dag = dag
    task5.dag = dag
    task6.dag = dag
    task4 >> task5 >> task6
