-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a ProfileMapping to support Athena aws_session_token
#609
Comments
Spent some time thinking about the best way to pass short-lived AWS credentials to Cosmos -- including the I'll still go ahead with the PR, since that will enable the session token to be used at the DAG level. However, the ideal is to generate the credentials on a per-task basis. These are the options that I've tried/considered so far: Option 1: Create a custom profile via the MWAA start-up script, and pass the profile name to Cosmose.g. aws configure role_arn = arn:aws:iam::xxxx:role/cosmos_test --profile cosmos
aws configure credential_source = EcsContainer The execution role is then able to assume the role, where running something like
returns
It's then possible to pass this to Cosmos, which then passes it to dbt-athena via the
IMO this approach isn't that good, since it involves creating a config file instead of using a role right away. Option 2: Make a boto3 STS assume call and inject the credentials during Cosmos profile generationThis is the (slightly bootleg) approach we ended up taking, since it allows a fresh set of credentials to be generated for each run. I think as a potential feature, it would be nice if Cosmos could take in an Airflow connection with a role_arn, and handle the generation of short term credentials under the hood (similar to the AWS Athena Operator + AWS Base Hook, where you pass in an Airflow connection with a import boto3
from airflow.utils.log.secrets_masker import mask_secret
from cosmos.profiles import BaseProfileMapping
from constants import DBT_ATHENA_ROLE_ARN
class AthenaAccessKeyProfileMapping(BaseProfileMapping):
"""
Maps Airflow Athena connections using access key +
secret access key authentication to dbt profiles.
"""
airflow_connection_type: str = "aws"
dbt_profile_type: str = "athena"
required_fields = [
"database",
"schema",
"s3_staging_dir",
"region_name",
]
airflow_param_mapping = {
"database": "extra.database",
"schema": "extra.schema",
"s3_staging_dir": "extra.s3_staging_dir",
"s3_data_dir": "extra.s3_data_dir",
"region_name": "extra.region_name",
"work_group": "extra.work_group",
}
@property
def profile(self) -> dict[str, Any | None]:
"""
Creates a dbt profile for Athena.
To avoid exposing the credentials in plaintext, the secret access key
and session token are masked using Airflow's default `sensitive_var_conn_names`
core configuration.
"""
credentials = self.get_temp_credentials()
mask_secret(credentials)
profile = {
**self.mapped_params,
**self.profile_args,
"aws_access_key_id": credentials["AccessKeyId"],
"aws_secret_access_key": credentials["SecretAccessKey"],
"aws_session_token": credentials["SessionToken"],
}
return self.filter_null(profile)
@staticmethod
def get_temp_credentials() -> dict[str, str]:
"""Helper function to retrieve temporary short lived credentials"""
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn=DBT_ATHENA_ROLE_ARN, RoleSessionName="DbtAthenaRoleSession"
)
return assumed_role["Credentials"] (EDIT: currently running into memory issues with this approach, so might not be feasible) Option 3: Mutate DbtLocalOperator class so that AWS credentials are generated before every rune.g. I think this option is quite complex and doesn't gel that well with the rest of Cosmos, since it creates floating env vars that aren't being properly managed |
Thanks for the explanation and details, @benjamin-awd ! I remember facing the STS token refresh issues when syncing data from AWS to GCP, and it wasn't fun. It's the same; most tools don't support automatically generating/refreshing these tokens. I also agree that option (2) seems the best way to move forward. You mentioned: So we can move forward, we'll merge #663 once the checks pass and log a separate issue to address automatic token refresh (2). Since you came up with this suggestion, would you like to log the issue? |
Adds the `aws_session_token` argument to Athena, which was added to dbt-athena 1.6.4 in dbt-labs/dbt-athena#459 Closes: #609 Also addresses this comment: #578 (comment)
Unfortunately stuck on this for now. I don't have scope at the moment to work on this, but hopefully might have some bandwidth over the next couple months. My gut feeling is that the memory issues are caused by the repeated creation of the boto3 client across multiple threads, since it seems that boto3 is not very memory efficient (boto/boto3#1670, boto/boto3#3541). I think using the
Raised an issue here: #691 |
Adds the `aws_session_token` argument to Athena, which was added to dbt-athena 1.6.4 in dbt-labs/dbt-athena#459 Closes: astronomer#609 Also addresses this comment: astronomer#578 (comment)
Once
dbt-athena
supportsaws_session_token
(https://github.com/dbt-athena/dbt-athena/pull/459/files by @benjamin-awd ), we can create a profile mapping to expose this feature in Cosmos.This is a follow-up to the #airflow-dbt conversation:
https://apache-airflow.slack.com/archives/C059CC42E9W/p1697600048273149?thread_ts=1696579576.364589&cid=C059CC42E9W
The text was updated successfully, but these errors were encountered: