I have a dag called my_dag.py that utilizes the S3KeySensor in Airflow 2 to check if a s3 key exists. When I use the sensor directly inside the dag, it works:
with TaskGroup('check_exists') as check_exists: 
    
  path = 's3://my-bucket/data/my_file'
  poke_interval = 30
  timeout = 60*60
  mode = 'reschedule'
  dependency_name = 'my_file'
  S3KeySensor(
    task_id = 'check_' + dependency_name + '_exists',
    bucket_key = path,
    poke_interval = poke_interval,
    timeout = timeout,
    mode = mode
  )
The log of the above looks like:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
This is correct. The reschedule is expected, because the file does not exist yet.
However, I want to check any number of paths in other dags, so I moved the sensor into a function called test in another file called helpers.py. I use a python operator in my_dag.py within the task group that calls test. It looks like this:
with TaskGroup('check_exists') as check_exists:
  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'
  wait_for_dependencies = PythonOperator(
    task_id = 'wait_for_my_file',
    python_callable = test,
    op_kwargs = {
      'dependency_name': dependency_name,
      'path': path
    },
    dag = dag
  )
  wait_for_dependencies
The function test in helpers.py looks like:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
    S3KeySensor(
        task_id = 'check_' + dependency_name + '_exists',
        bucket_key = path,
        poke_interval = poke_interval,
        timeout = timeout,
        mode = mode
    )
However, when I run the dag, the step is marked as success even though the file is not there. The logs show:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
It seems airflow doesn't like using a sensor via a python operator. Is this true? Or am I doing something wrong?
My goal is to loop through multiple paths and check if each one exists. However, I do this in other dags, which is why I'm putting the sensor in a function that resides in another file.
If there are alternative ideas to doing this, I'm open!
Thanks for your help!