-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
No response
Apache Airflow version
3.0.3
Operating System
Linux (amazon linux AL2023), EKS 1.32
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
Since upgrading to Airflow 3.0.3, our DAG using EKSPodOperator is no longer able to authenticate with our EKS cluster.
When the DAG runs, it fails with a 401 Unauthorized error when trying to list pods:
https://github.com/apache/airflow/blob/3.0.3/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py#L627
From the EKS side, we can see in the audit logs that the API call is made with no user identity:
{
"kind": "Event",
"apiVersion": "audit.k8s.io/v1",
"level": "Request",
"auditID": "5007765e-80ab-475b-a30d-6bd769ea71f2",
"stage": "ResponseStarted",
"requestURI": "/api/v1/namespaces/<YOUR_NAMESPACE>/pods?labelSelector=dag_id%3Dwoke_eks%2Ckubernetes_pod_operator%3DTrue%2Crun_id%3Dmanual__2025-07-18T213454.6384250000-30e0ce312%2Ctask_id%3Dexecute_k8s_eks_work%2Calready_checked%21%3DTrue%2C%21airflow-worker",
"verb": "list",
"user": {},
"sourceIPs": ["XXXX"],
"userAgent": "OpenAPI-Generator/32.0.1/python",
"objectRef": {
"resource": "pods",
"namespace": "function-execution-dev",
"apiVersion": "v1"
},
"responseStatus": {
"metadata": {},
"status": "Failure",
"message": "Unauthorized",
"reason": "Unauthorized",
"code": 401
},
"requestReceivedTimestamp": "2025-07-18T21:35:02.934260Z",
"stageTimestamp": "2025-07-18T21:35:02.934497Z"
}We found a potential fix here:
We tried backporting the commit from that PR into our 3.0.3 deployment. Although it removed the ORM stack trace from the logs, the authentication issue still persists.
Based on the explanation in the PR, we suspect the issue might be related to how the aws_connection is retrieved. We store our AWS connection in the Airflow metadata database, and if the system using TaskSDK is no more able to initialised the ORM, it could be unable to access the metadata-database in the same time?
Here are some tests we performed and their results:
From the worker, we manually ran the token generation:
python3.12 -m airflow.providers.amazon.aws.utils.eks_get_token \
--cluster-name CLUSTER \
--region-name eu-central-1 \
--aws-conn-id val_aws_assume_testWe used the generated token to query the Kubernetes API:
curl -s --header "Authorization: Bearer $TOKEN" \
--insecure \
"$APISERVER/api/v1/namespaces/<YOUR_NAMESPACE>/pods?labelSelector=dag_id%3Dwoke_eks%2Ckubernetes_pod_operator%3DTrue%2Crun_id%3Dmanual__2025-07-18T134438.5412460000-53e546ffb%2Ctask_id%3Dexecute_k8s_eks_work%2Calready_checked%21%3DTrue%2C%21airflow-worker"✅ This works as expected (so our connection and roles are correctly configured).
However, when we export PYTHON_OPERATORS_VIRTUAL_ENV_MODE=1, the generated token is invalid and results in a 401 error.
We also tried blocking the temporary kubeconfig deletion to debug from the worker pod. During local testing with the kubeconfig, we faced multiple issues:
unsupported API versions (alpha1)
regex overmatch problems
errors when setting PYTHON_OPERATORS_VIRTUAL_ENV_MODE=1
None of these tests gave conclusive insights. We are currently unable to use the generated kubeconfig reliably for local validation.
What you think should happen instead
Should works with connection the same way than before in 2.X.
How to reproduce
EKS setup, Create a connection of AWS type (we have tryied both assume role or user static credentials)
The following DAG:
from __future__ import annotations
import os
import datetime
import logging
from pathlib import Path
from airflow.sdk import DAG
from airflow.models.param import Param
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
from kubernetes.client import models as k8s
with DAG(
dag_id=Path(__file__).stem.removeprefix("dag_"),
dag_display_name="woke pod",
schedule=None,
catchup=False,
# concurrency=1,
max_active_runs=1,
start_date=datetime.datetime(2022, 3, 4),
tags=["poc"],
is_paused_upon_creation=True,
doc_md=__doc__,
params={
"containerImage": Param(
"",
type="string",
title="containerImage",
examples=["<IMAGE_TO_DEPLOY>"],
),
},
) as dag:
logger = logging.getLogger(__name__)
logger.info("This is a log message")
securityContextRunAsUser = int(os.getenv("K8S_SECURITYCONTEXT_RUNASUSER", 0))
volumeClaim = os.getenv("K8S_VOLUMECLAIM")
serviceAccount = os.getenv("K8S_SERVICE_ACCOUNT_NAME")
tolerationsIsolatedWorkload = os.getenv("K8S_TOLERATIONS_ISOLATEDWORKLOAD")
clusterName = os.getenv("K8S_CLUSTER_NAME")
k8sNamespace = os.getenv("K8S_NAMESPACE")
exec_k8s_eks = EksPodOperator(
task_id="execute_k8s_eks_work",
pod_name="worky_eks_pod_name",
cluster_name=clusterName,
aws_conn_id="<YOUR_CONNECTION_ID>",
namespace=k8sNamespace,
image="{{ params.containerImage }}",
cmds=["sleep", "60"],
get_logs=True,
deferrable=False,
container_logs=True, # debugging: to actually see stdout for containers without airflow UI (nice for dev)
tolerations=[
k8s.V1Toleration(
key="IsolatedWorkload", operator="Equal", value=tolerationsIsolatedWorkload, effect="NoSchedule"
)
],
volumes=(
[
k8s.V1Volume(
name=volumeClaim,
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=volumeClaim),
)
]
if volumeClaim
else []
),
volume_mounts=(
[k8s.V1VolumeMount(name=volumeClaim, mount_path=f"/{volumeClaim}", sub_path=None, read_only=False)]
if volumeClaim
else []
),
service_account_name=serviceAccount,
security_context=k8s.V1PodSecurityContext(
fs_group=securityContextRunAsUser,
run_as_group=securityContextRunAsUser,
run_as_user=securityContextRunAsUser,
),
)
exec_k8s_eks
if __name__ == "__main__":
dag.test()Run the Dag in airflow and you should see the 401 log.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct