Skip to content

Conversation

@viiccwen
Copy link
Contributor

Summary

This PR introduces two related fixes to stabilize ORM initialization and session handling in AWS-related utilities.

Changes

  1. Add _ensure_db_session in eks_get_token

    • Introduced _ensure_db_session() to explicitly initialize the ORM when engine or Session are None.
    • Ensures that running eks_get_token as a standalone CLI properly initializes Airflow settings before creating the EksHook.
    • Added corresponding unit test test_ensure_db_session_initializes_orm in test_eks_get_token.py to validate this behavior.
  2. Refactor config initialization in base_aws

    • Moved configuration creation logic inside the get_session method.
    • This ensures lazy evaluation and avoids potential issues when Session is not yet available at import time.
    • Improves reliability when invoking AWS hooks in contexts where Airflow ORM is not pre-initialized.

Files Changed

  • airflow/providers/amazon/aws/utils/eks_get_token.py
  • airflow/providers/amazon/aws/utils/tests/test_eks_get_token.py
  • airflow/providers/amazon/aws/hooks/base_aws.py

Motivation

Previously, invoking eks_get_token directly from CLI could fail because Airflow’s ORM (settings.engine, settings.Session) was not yet initialized. This PR ensures ORM initialization is explicit and reliable.

Additionally, deferring configuration creation inside get_session removes the risk of creating invalid or stale sessions during import, aligning with lazy evaluation best practices.

Testing

  • Added new unit test for _ensure_db_session to validate ORM initialization when engine and Session are None.

Related Issues

resolves #53578

Reproduced Step

# 1) initialization
# airflow db migrate
# airflow connections add val_aws_assume_test \
  --conn-uri 'aws://FAKE_KEY:FAKE_SECRET@/?region_name=eu-central-1'

# 2) reproduced
# export AWS_EC2_METADATA_DISABLED=true
# export PYTHON_OPERATORS_VIRTUAL_ENV_MODE=1
# python -m airflow.providers.amazon.aws.utils.eks_get_token \
  --cluster-name dummy --region-name eu-central-1

in main branch:

[2025-08-16T16:13:59.043+0000] {base_aws.py:621} WARNING - Unable to find AWS Connection ID 'val_aws_assume_test', switching to empty.
[2025-08-16T16:13:59.044+0000] {base_aws.py:197} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1meu-central-1\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2025-08-16T16:13:59.048+0000] {base_aws.py:197} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1meu-central-1\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2025-08-16T16:13:59.619+0000] {base_aws.py:621} WARNING - Unable to find AWS Connection ID 'aws_default', switching to empty.
[2025-08-16T16:13:59.619+0000] {base_aws.py:197} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1meu-central-1\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py", line 64, in <module>
    main()
  File "/opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py", line 58, in main
    access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py", line 648, in fetch_access_token_for_cluster
    signed_url = signer.generate_presigned_url(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/botocore/signers.py", line 355, in generate_presigned_url
    self.sign(
  File "/usr/local/lib/python3.12/site-packages/botocore/signers.py", line 200, in sign
    auth.add_auth(request)
  File "/usr/local/lib/python3.12/site-packages/botocore/auth.py", line 421, in add_auth
    raise NoCredentialsError()
botocore.exceptions.NoCredentialsError: Unable to locate credentials

in fix-aws-orm-initialization:

[2025-08-17T05:17:18.561+0000] {connection_wrapper.py:334} INFO - AWS Connection (conn_id='val_aws_assume_test', conn_type='aws') credentials retrieved from login and password.
[2025-08-17T05:17:19.043+0000] {base_aws.py:621} WARNING - Unable to find AWS Connection ID 'aws_default', switching to empty.
[2025-08-17T05:17:19.043+0000] {base_aws.py:197} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1meu-central-1\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
expirationTimestamp: 2025-08-17T05:31:19Z, token: k8s-aws-v1.aHR0cHM6Ly9zdHMuZXUtY2VudHJhbC0xLmFtYXpvbmF3cy5jb20vP0FjdGlvbj1HZXRDYWxsZXJJZGVudGl0eSZWZXJzaW9uPTIwMTEtMDYtMTUmWC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1GQUtFX0tFWSUyRjIwMjUwODE3JTJGZXUtY2VudHJhbC0xJTJGc3RzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNTA4MTdUMDUxNzE5WiZYLUFtei1FeHBpcmVzPTYwJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCUzQngtazhzLWF3cy1pZCZYLUFtei1TaWduYXR1cmU9NjI2NmNhMzRjOTAyMTE4ZjdhOTA5YWU1MGI3NzkzMzg4ZDBkNGZiYmZlYzhmZjkwN2NiMzJkODE5NWMzZjU0Yg

- Add `_ensure_db_session` to initialize orm when engine or session are None
- Add `test_ensure_db_session_initializes_orm` to test ORM initialization logic
Move config creation inside get_session for better lazy evaluation to avoid session issue
@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Aug 17, 2025
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the fix.

- Rename _ensure_db_session to _ensure_orm_configured
- Simplify ORM check (remove session part)
- add docstring with inline comment
- Update corresponding test function name and calls
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand: where eks_get_token is going to be used an in what circumstances?

Airflow Providers in Airflow 3 are supposed to not use Airflow ORM at all. So it sounds strange that there is a tool in provider that expects to initialize it.

What's the purpose and scenario of running this tool ?

@viiccwen
Copy link
Contributor Author

The eks_get_token tool is used to generate an EKS access token, which is required for interacting with the Kubernetes API. This token is essential for tasks like using the EksPodOperator to interact with an EKS cluster. The tool is part of the process that enables Airflow to authenticate and authorize to interact with Kubernetes Pods in an EKS cluster.

In this case, eks_get_token is a tool that depends on Airflow's ORM to correctly access AWS credentials stored in the metadata database. While Airflow 3 tries to reduce ORM dependency, some actions still require it for specific use cases like reading connection settings.

@jason810496
Copy link
Member

Hi @potiuk, I just checked the usage of eks_get_token, and it appears to be a special case.

The EksHook.execute method writes the kubeconfig file using generate_config_file:

def execute(self, context: Context):
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)
with eks_hook.generate_config_file(
eks_cluster_name=self.cluster_name, pod_namespace=self.namespace
) as self.config_file:
return super().execute(context)

The generate_config_file function leverages the eks_get_token module to generate the kubeconfig via EksHook.fetch_access_token_for_cluster:

@contextmanager
def generate_config_file(
self,
eks_cluster_name: str,
pod_namespace: str | None,
) -> Generator[str, None, None]:
"""
Write the kubeconfig file given an EKS Cluster.
:param eks_cluster_name: The name of the cluster to generate kubeconfig file for.
:param pod_namespace: The namespace to run within kubernetes.
"""
args = ""
if self.region_name is not None:
args = args + f" --region-name {self.region_name}"
if self.aws_conn_id is not None:
args = args + f" --aws-conn-id {self.aws_conn_id}"
# We need to determine which python executable the host is running in order to correctly
# call the eks_get_token.py script.
python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}"
# Set up the client
eks_client = self.conn
# Get cluster details
cluster = eks_client.describe_cluster(name=eks_cluster_name)
cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
cluster_ep = cluster["cluster"]["endpoint"]
cluster_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [
{
"cluster": {"server": cluster_ep, "certificate-authority-data": cluster_cert},
"name": eks_cluster_name,
}
],
"contexts": [
{
"context": {
"cluster": eks_cluster_name,
"namespace": pod_namespace,
"user": _POD_USERNAME,
},
"name": _CONTEXT_NAME,
}
],
"current-context": _CONTEXT_NAME,
"preferences": {},
"users": [
{
"name": _POD_USERNAME,
"user": {
"exec": {
"apiVersion": AUTHENTICATION_API_VERSION,
"command": "sh",
"args": [
"-c",
COMMAND.format(
python_executable=python_executable,
eks_cluster_name=eks_cluster_name,
args=args,
),
],
"interactiveMode": "Never",
}
},
}
],
}
config_text = yaml.dump(cluster_config, default_flow_style=False)
with tempfile.NamedTemporaryFile(mode="w") as config_file:
config_file.write(config_text)
config_file.flush()
yield config_file.name

This PR is more of a quick fix to resolve #53578.

To fully address the issue without directly accessing the ORM, we would need to introduce a new CommsDecoder for this case. That would require more effort and could be taken up in a follow-up PR IMO.

Additionally, @viiccwen, it would be great if you could test this fix with a real EKS setup, thanks!

@potiuk
Copy link
Member

potiuk commented Aug 18, 2025

In this case, eks_get_token is a tool that depends on Airflow's ORM to correctly access AWS credentials stored in the metadata database. While Airflow 3 tries to reduce ORM dependency, some actions still require it for specific use cases like reading connection settings.

task.sdk already supports retrieving credentials stored in airflow configuration and this is THE way to retrieve it. I assume this command is run internally in the worker, not maually by the user at some point in time, and in this case just using Connection.get() from task.sdk is THE way of retrieving it - with potentially fallback of using older mechanism when task.sdk is not installed. @amoghrajesh @kaxil @ashb -> am I right here?

@potiuk
Copy link
Member

potiuk commented Aug 18, 2025

Simply: unconditional ORM configuration in provider's code run in the worker is just plain wrong. It would only be acceptable if it is run by a human running this command manually after logging in to one of the containers that have database access - scheduler, triggerer, dag file processor. When your code is run in the worker on Airflow 3, you have no access to database credentials and ORM initialization will fail.

@jason810496
Copy link
Member

I understand your point, and I agree that in Airflow 3 we should not have access to the database.

If I am not mistaken about the purpose of the eks_get_token module, it is designed to be called directly with the Python executable, rather than being initialized through TaskSDK. Since it is executed by Python directly, and not within executor, triggerer, or similar processes, it operates outside the typical Airflow runtime.

export PYTHON_OPERATORS_VIRTUAL_ENV_MODE=1
output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
--cluster-name {eks_cluster_name} {args} 2>&1)

The EksHook here represents a special case, as it is invoked by a shell command such as:

sh -c output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token --cluster-name {eks_cluster_name} {args} 2>&1) ...

Therefore, if we truly want to eliminate database access when retrieving connections in the context of EksHook, it seems necessary to introduce a provider-specific (or connection-specific) CommsDecoder for obtaining connection credentials via TaskSDK. Since initialization of SUPERVISOR_COMMS occurs within the task_runner, it may be possible to initialize SUPERVISOR_COMMS within the main function of the eks_get_token module. This approach could resolve the error without requiring database access.

@potiuk
Copy link
Member

potiuk commented Aug 18, 2025

If I am not mistaken about the purpose of the eks_get_token module, it is designed to be called directly with the Python executable, rather than being initialized through TaskSDK. Since it is executed by Python directly, and not within executor, triggerer, or similar processes, it operates outside the typical Airflow runtime.

Well. from what I see this command is executed on the pod within EKS cluster - and it would mean that EKS cluster will have to have DB credentials in order to initialize ORM - and worker does not have those credentials anyway (in Airflow 3). Initialising ORM in this place is just not going to work - unless of course EKS cluster has the credentials which is obviously wrong according to the Airflow security model.

I think though there is a way better solution - that is also "proper" the credentials of AWS connection should be uploaded to the EKS cluster (temporarily) as secret and the PODs run on EKS should use that secret (and the secret should be deleted after the pod completes. You cannot reallly get the EKS cluster that is used there to communicate with TaskSDK because there will normally be no firewalls opened for the cluster worker to communicate with API server (not mentioning that similarly if you want to run ORM/DB access you will have to have the capability of the EKS cluster to communicate back with Airflow DB.

In either case - I think initializing the ORM does not solve the problem at all

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless of course EKS cluster has the credentials which is obviously wrong according to the Airflow security model.

I see and agree with your point.
I overlooked the key issue that the EKS cluster would need to have database credentials in order to initialize the ORM. Thank you for highlighting this.

I think though there is a way better solution - that is also "proper" the credentials of AWS connection should be uploaded to the EKS cluster (temporarily) as secret and the PODs run on EKS should use that secret (and the secret should be deleted after the pod completes. You cannot reallly get the EKS cluster that is used there to communicate with TaskSDK because there will normally be no firewalls opened for the cluster worker to communicate with API server (not mentioning that similarly if you want to run ORM/DB access you will have to have the capability of the EKS cluster to communicate back with Airflow DB.

Yes, setting the token with the credentials from the AWS connection is a much more appropriate solution in this case, and it also aligns with the behavior of other providers.

So, @viiccwen, it would be helpful to refactor the get_eks_token logic to use the Airflow Connection extra field (perhaps named eks_sa_token). Thank you!

The implementation could follow the pattern used in athena_sql for retrieving the driver config from the extra field:

return dict(
driver=self.conn.extra_dejson.get("driver", "rest"),
schema_name=self.conn.schema,
region_name=self.conn.region_name,
aws_domain=self.conn.extra_dejson.get("aws_domain", "amazonaws.com"),
)

Additionally, we should document this breaking change for EksHook, and provide guidance for users on how to set up credentials using Airflow Connection. For example, users should create a Service Account in EKS first, then set <SERVICE-ACCOUNT-TOKEN> in the Airflow Connection as eks_sa_token in the extra field.

cc @o-nikolas @vincbeck
Do you have any comments or feedback on this?

@viiccwen
Copy link
Contributor Author

hello @jason810496 @potiuk,
big thanks for spending so much time reviewing and providing feedback on this PR 🙏
Your opinion helps a lot, I’ll revisit the points raised and work on another solution for this issue!

@o-nikolas
Copy link
Contributor

Since #54083 was merged all the EKS tests on our system test dashboard have been broken: https://aws-mwaa.github.io/#/open-source/system-tests/dashboard.html

From what I can tell this PR is solving the same issue (or at least may solve the same issue).

@viiccwen Have you made any progress on this task? I'd like to help expedite this one if possible!

@viiccwen
Copy link
Contributor Author

viiccwen commented Sep 3, 2025

Since #54083 was merged all the EKS tests on our system test dashboard have been broken: https://aws-mwaa.github.io/#/open-source/system-tests/dashboard.html

From what I can tell this PR is solving the same issue (or at least may solve the same issue).

@viiccwen Have you made any progress on this task? I'd like to help expedite this one if possible!

Sorry for lately reply, I didn't see this comment until you opened the PR. 🥲
Thx for solving the issue, I'll close this PR later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

EKSPodOperator authentication issue using connections from metadata-database in Airflow 3.0.3

6 participants