Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support automatic/short-lived token generation for AWS Athena #691

Closed
benjamin-awd opened this issue Nov 20, 2023 · 8 comments
Closed

Support automatic/short-lived token generation for AWS Athena #691

benjamin-awd opened this issue Nov 20, 2023 · 8 comments
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:performance Related to performance, like memory usage, CPU usage, speed, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc dbt:run Primarily related to dbt run command or functionality priority:medium Medium priority issues are important issues that may have a workaround and medium impact profile:athena Related to Athena ProfileConfig

Comments

@benjamin-awd
Copy link
Contributor

benjamin-awd commented Nov 20, 2023

Spent some time thinking about the best way to pass short-lived AWS credentials to Cosmos -- including the aws_session_token into the profile will have limited utility for anyone using an Airflow connection, since the token will expire each hour (unless the connection is being dynamically refreshed or something, which sounds pretty unwieldy).

These are the options that I've tried/considered so far:

Option 1: Create a custom profile via the MWAA start-up script, and pass the profile name to Cosmos

e.g.

aws configure role_arn = arn:aws:iam::xxxx:role/cosmos_test --profile cosmos
aws configure credential_source = EcsContainer

The execution role is then able to assume the role, where running something like

aws sts get-caller-identity --profile cosmos

returns

{
    "UserId": "AROA2SDLV6CB4Y123456:botocore-session-12345",
    "Account": "123456",
    "Arn": "arn:aws:sts::123456:assumed-role/cosmos_test/botocore-session-12345"
}

It's then possible to pass this to Cosmos, which then passes it to dbt-athena via the aws_profile_name argument. However, I couldn't get this approach to work, since for some reason boto3 can't extract the credentials from the ECS/EC2 metadata endpoint.

{{config.py:168}} INFO - Creating temporary profiles.yml at /tmp/tmpesa3d1yx/profiles.yml with the following contents:
athena:
    outputs:
        stg:
            aws_profile_name: cosmos
            database: awsdatacatalog
            region_name: us-east-1
            s3_data_dir: ...
            s3_staging_dir: ...
            schema: ...
            type: athena
            work_group: ...
    target: stg

{{subprocess.py:73}} INFO - Running command: ['/usr/local/airflow/dbt_venv/bin/dbt', 'run', '--vars <redacted>', '--profiles-dir', '/tmp/tmpesa3d1yx', '--profile', 'athena', '--target', 'stg']
{{subprocess.py:84}} INFO - Command output:
{{subprocess.py:94}} INFO - [0m08:36:27  Running with dbt=1.6.7
{{subprocess.py:94}} INFO - [0m08:36:27  Registered adapter: athena=1.6.1
...
{{subprocess.py:94}} INFO - [0m08:36:35  Athena adapter: Got an error when attempting to open a Athena connection due to 
Error when retrieving credentials from EcsContainer:  No credentials found in credential_source referenced in profile cosmos

IMO this approach isn't that good, since it involves creating a config file instead of using a role right away.

Option 2: Make a boto3 STS assume call and inject the credentials during Cosmos profile generation

This is the (slightly bootleg) approach we ended up taking, since it allows a fresh set of credentials to be generated for each run.

I think as a potential feature, it would be nice if Cosmos could take in an Airflow connection with a role_arn, and handle the generation of short term credentials under the hood (similar to the AWS Athena Operator + AWS Base Hook, where you pass in an Airflow connection with a role_arn argument)

import boto3
from airflow.utils.log.secrets_masker import mask_secret
from cosmos.profiles import BaseProfileMapping
from constants import DBT_ATHENA_ROLE_ARN

class AthenaAccessKeyProfileMapping(BaseProfileMapping):
    """
    Maps Airflow Athena connections using access key +
    secret access key authentication to dbt profiles.
    """

    airflow_connection_type: str = "aws"
    dbt_profile_type: str = "athena"

    required_fields = [
        "database",
        "schema",
        "s3_staging_dir",
        "region_name",
    ]

    airflow_param_mapping = {
        "database": "extra.database",
        "schema": "extra.schema",
        "s3_staging_dir": "extra.s3_staging_dir",
        "s3_data_dir": "extra.s3_data_dir",
        "region_name": "extra.region_name",
        "work_group": "extra.work_group",
    }

    @property
    def profile(self) -> dict[str, Any | None]:
        """
        Creates a dbt profile for Athena.

        To avoid exposing the credentials in plaintext, the secret access key
        and session token are masked using Airflow's default `sensitive_var_conn_names`
        core configuration.
        """
        credentials = self.get_temp_credentials()
        mask_secret(credentials)
        profile = {
            **self.mapped_params,
            **self.profile_args,
            "aws_access_key_id": credentials["AccessKeyId"],
            "aws_secret_access_key": credentials["SecretAccessKey"],
            "aws_session_token": credentials["SessionToken"],
        }
        return self.filter_null(profile)

    @staticmethod
    def get_temp_credentials() -> dict[str, str]:
        """Helper function to retrieve temporary short lived credentials"""
        sts_client = boto3.client("sts")
        assumed_role = sts_client.assume_role(
            RoleArn=DBT_ATHENA_ROLE_ARN, RoleSessionName="DbtAthenaRoleSession"
        )
        return assumed_role["Credentials"]

(EDIT: currently running into memory issues with this approach, so might not be feasible

My gut feeling is that the memory issues are caused by the repeated creation of the boto3 client across multiple threads, since it seems that boto3 is not very memory efficient (boto/boto3#1670, boto/boto3#3541). I think using the AwsBaseHook class instead of boto3 might resolve this)

Option 3: Mutate DbtLocalOperator class so that AWS credentials are generated before every run

e.g. export $(aws configure export-credentials --profile cosmos --format=env) && dbt run ...

I think this option is quite complex and doesn't gel that well with the rest of Cosmos, since it creates floating env vars that aren't being properly managed
(IIRC there's a issue/PR related to this that talks about adding custom commands to dbt run)

Copy link

dosubot bot commented Nov 20, 2023

🤖

Hello @benjamin-awd,

I'm Dosu, a bot here to assist you with your issues and queries related to the Cosmos repository. While we're waiting for a human maintainer, I'm here to help you navigate and understand the project. Thank you for your detailed issue, I'm currently looking into it and will provide a full answer shortly.

@dosubot dosubot bot added area:config Related to configuration, like YAML files, environment variables, or executer configuration area:performance Related to performance, like memory usage, CPU usage, speed, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc dbt:run Primarily related to dbt run command or functionality priority:medium Medium priority issues are important issues that may have a workaround and medium impact profile:athena Related to Athena ProfileConfig labels Nov 20, 2023
Copy link

dosubot bot commented Nov 20, 2023

🤖

Thank you for your detailed analysis and proposed solutions for supporting automatic/short-lived token generation for AWS Athena in Cosmos. It's great to see your enthusiasm and willingness to contribute!

Based on your options, it seems like Option 2, where Cosmos handles the generation of short-term credentials under the hood, would be a valuable addition. This approach aligns with the existing AWS Athena Operator and AWS Base Hook in Apache Airflow, making it more consistent and intuitive for users.

I encourage you to proceed with your plan to create a pull request (PR) for Option 2. By implementing this feature, you will not only enhance the functionality of Cosmos but also contribute to the broader community of users who rely on AWS Athena with Apache Airflow.

Once you have created the PR, the maintainers will review it and provide feedback. Thank you for your contribution and for making Cosmos even better!


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@octiva
Copy link
Contributor

octiva commented Dec 8, 2023

Hey @benjamin-awd @tatiana, I worked on this yesterday and implemented a solution where we get the connection details from the AwsGenericHook, which is based on the Airflow connection.

I have tests written and an example DAG in the dev astro env.

I think it would be better if Cosmos was better aligned to Airflow providers that utilise Airflow Connections.

I have a branch ready, and would like to open a PR for your review if possible.

@benjamin-awd
Copy link
Contributor Author

Sure, feel free to raise the PR and I'll take a look :)

@tatiana
Copy link
Collaborator

tatiana commented Dec 8, 2023

Looking forward to seeing your PR, @octiva !

@octiva
Copy link
Contributor

octiva commented Dec 8, 2023

Hey @tatiana & @benjamin-awd I cant seem to push via https or ssh... Do I need to be given some permissions?

@jlaneve
Copy link
Collaborator

jlaneve commented Dec 8, 2023

Hey @octiva, you'll likely need to fork the repository into your personal account and push changes there. Then it's super easy to open the PR from your fork back into the main repository here!

@octiva
Copy link
Contributor

octiva commented Dec 11, 2023

Hey @benjamin-awd, would you mind providing me some feedback on the PR: #758

Would be much appreciated!

arojasb3 pushed a commit to arojasb3/astronomer-cosmos that referenced this issue Jul 14, 2024
## Description

<!-- Add a brief but complete description of the change. -->

Passes the `conn_id` to the `AwsGenericHook` and uses
`get_credentials()`, which handles the creation of a session,
credentials, freezing of credentials & also masking.

[See get_credentials() docs
here](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/hooks/base_aws/index.html#airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook.get_credentials)

## Related Issue(s)

Closes: astronomer#691 

Co-authored-by: Spencer horton <spencer.horton@workcoverqld.com.au>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:performance Related to performance, like memory usage, CPU usage, speed, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc dbt:run Primarily related to dbt run command or functionality priority:medium Medium priority issues are important issues that may have a workaround and medium impact profile:athena Related to Athena ProfileConfig
Projects
None yet
Development

No branches or pull requests

4 participants