TL;DR How do I create a dynamic EmailOperator that sends a file from a filepath that is an XCom property
Hello Folks,
I'm using Apache Airflow 2.0.0.b2. My issue is that my DAG creates a file whose name changes at runtime. I'd like to email this file out but I'm having problems getting the dynamic file name into my EmailOperator.
Things I've Tried that Have Failed!:
Use templating for
filesproperty.files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],Unfortunately, templating only works when the field in the operator is marked as such.
filesis not a templatable field on the EmailOperatorDynamically create my task using a function
def get_email_operator(?...): export_file_path = ti.xcom_pull(key='OUTPUT_CSV') email_subject = 'Some Subject' return EmailOperator( task_id="get_email_operator", to=['someemail@somedomain.net'], subject=email_subject, files=[export_file_path,], html_content='<br>', dag=current_dag) ..task3 >> get_email_operator() >> task4Unfortunately, I can't seem to figure out how to pass the current
**kwargsortiinformation into my function call to get the current filepath.
EDIT: Elad's answer below set me in the right direction. The only thing I had to to do to make it work was to add kwargs when calling op.execute()
Solution:
def get_email_operator(**kwargs):
export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
email_subject = 'Termed Drivers - ' + date_string
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>')
op.execute(kwargs)