I am using the Composer version 2.0.0 Airflow 2.1.4 and I have created a KubernetesPodOperator that is trying to access the Airflow connection stored in the Google Secrets Manager. But it isn't able to locate the credentials (check exceptions below). I have also tried to pass the airflow connection secrets to env_vars in KubernetesPodOperator and also tried to pass it as Kubernetes Secrets to the pod but still no luck.
Below is my code for both the cases above:
aws_uri = BaseHook.get_connection('aws_conn').get_uri()
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID), k8s.V1EnvVar(name="aws_conn_id", value=aws_uri)],
is_delete_operator_pod=True,
)
secret_aws_conn_id = Secret(
deploy_type='env',
deploy_target='AWS_CONN_ID',
secret='aws-conn-id', // my kubernetes secret
key='aws-conn-key',
)
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
secrets=[secret_aws_conn_id],
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID)],
is_delete_operator_pod=True,
)
Although, when i print the connection_id URI i can get the URI for it correctly but the job always throw below exceptions
[2022-04-01 08:01:09,563] {pod_manager.py:197} INFO - botocore.exceptions.NoCredentialsError: Unable to locate credentials
[2022-04-01 08:01:09,374] {pod_manager.py:197} INFO - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - [SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - FROM connection
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - WHERE connection.conn_id = ?
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - LIMIT ? OFFSET ?]
[2022-04-01 08:01:09,354] {pod_manager.py:197} INFO - [[34m2022-04-01 08:01:09,297[0m] {[34mconnection.py:[0m407} ERROR[0m - Unable to retrieve connection from secrets backend (MetastoreBackend).
Checking subsequent secrets backend.[0m
Can someone please help me out to resolve this issue?