From 63b976da69d58f5de48e4f254a6816176ac0ccca Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Jun 2025 07:41:26 -0400 Subject: [PATCH 1/6] Support datafusion-cli access to public S3 buckets that do not require authentication --- Cargo.lock | 1 + datafusion-cli/Cargo.toml | 1 + datafusion-cli/src/object_storage.rs | 287 ++++++++++++++-------- docs/source/user-guide/cli/datasources.md | 66 +++-- 4 files changed, 232 insertions(+), 123 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77f0e8d1e350..28b4b10122f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1962,6 +1962,7 @@ dependencies = [ "futures", "insta", "insta-cmd", + "log", "mimalloc", "object_store", "parking_lot", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 2eec93628b52..40a9b28f272d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -55,6 +55,7 @@ datafusion = { workspace = true, features = [ dirs = "6.0.0" env_logger = { workspace = true } futures = { workspace = true } +log = "0.4.27" mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "gcp", "http"] } parking_lot = { workspace = true } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index c31310093ac6..66bf94c54829 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -28,7 +28,8 @@ use datafusion::execution::context::SessionState; use async_trait::async_trait; use aws_config::BehaviorVersion; -use aws_credential_types::provider::ProvideCredentials; +use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; +use log::debug; use object_store::aws::{AmazonS3Builder, AwsCredential}; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::http::HttpBuilder; @@ -46,6 +47,7 @@ pub async fn get_s3_object_store_builder( region, endpoint, allow_http, + skip_signature, } = aws_options; let bucket_name = get_bucket_name(url)?; @@ -54,6 +56,7 @@ pub async fn get_s3_object_store_builder( if let (Some(access_key_id), Some(secret_access_key)) = (access_key_id, secret_access_key) { + debug!("Using explicitly provided S3 access_key_id and secret_access_key"); builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); @@ -62,23 +65,21 @@ pub async fn get_s3_object_store_builder( builder = builder.with_token(session_token); } } else { - let config = aws_config::defaults(BehaviorVersion::latest()).load().await; - if let Some(region) = config.region() { - builder = builder.with_region(region.to_string()); + debug!("Using AWS S3 SDK to determine credentials"); + let CredentialsFromConfig { + region, + credentials, + } = CredentialsFromConfig::try_new().await?; + if let Some(region) = region { + builder = builder.with_region(region); + } + if let Some(credentials) = credentials { + let credentials = Arc::new(S3CredentialProvider { credentials }); + builder = builder.with_credentials(credentials); + } else { + debug!("No credentials found, defaulting to skip signature "); + builder = builder.with_skip_signature(true); } - - let credentials = config - .credentials_provider() - .ok_or_else(|| { - DataFusionError::ObjectStore(object_store::Error::Generic { - store: "S3", - source: "Failed to get S3 credentials from the environment".into(), - }) - })? - .clone(); - - let credentials = Arc::new(S3CredentialProvider { credentials }); - builder = builder.with_credentials(credentials); } if let Some(region) = region { @@ -105,9 +106,52 @@ pub async fn get_s3_object_store_builder( builder = builder.with_allow_http(*allow_http); } + if let Some(skip_signature) = skip_signature { + builder = builder.with_skip_signature(*skip_signature); + } + Ok(builder) } +/// Credentials from the AWS SDK +struct CredentialsFromConfig { + region: Option, + credentials: Option, +} + +impl CredentialsFromConfig { + /// Attempt find AWS S3 credentials via the AWS SDK + pub async fn try_new() -> Result { + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + let region = config.region().map(|r| r.to_string()); + + let credentials = config + .credentials_provider() + .ok_or_else(|| { + DataFusionError::ObjectStore(object_store::Error::Generic { + store: "S3", + source: "Failed to get S3 credentials aws_config".into(), + }) + })? + .clone(); + + // The credential provider is lazy, so it does not fetch credentials + // until they are needed. To ensure that the credentials are valid, + // we can call `provide_credentials` here. + let credentials = match credentials.provide_credentials().await { + Ok(_) => Some(credentials), + Err(e) => { + debug!("Could not use AWS SDK to get credentials: {e}"); + None + } + }; + Ok(Self { + region, + credentials, + }) + } +} + #[derive(Debug)] struct S3CredentialProvider { credentials: aws_credential_types::provider::SharedCredentialsProvider, @@ -219,6 +263,11 @@ pub struct AwsOptions { pub endpoint: Option, /// Allow HTTP (otherwise will always use https) pub allow_http: Option, + /// Do not fetch credentials and do not sign requests + /// + /// This can be useful when interacting with public S3 buckets that deny + /// authorized requests + pub skip_signature: Option, } impl ExtensionOptions for AwsOptions { @@ -256,6 +305,9 @@ impl ExtensionOptions for AwsOptions { "allow_http" => { self.allow_http.set(rem, value)?; } + "skip_signature" | "nosign" => { + self.skip_signature.set(rem, value)?; + } _ => { return config_err!("Config value \"{}\" not found on AwsOptions", rem); } @@ -461,7 +513,6 @@ mod tests { use super::*; - use datafusion::common::plan_err; use datafusion::{ datasource::listing::ListingTableUrl, logical_expr::{DdlStatement, LogicalPlan}, @@ -470,6 +521,38 @@ mod tests { use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey}; + #[tokio::test] + async fn s3_object_store_builder_default() -> Result<()> { + let location = "s3://bucket/path/FAKE/file.parquet"; + + // No options + let table_url = ListingTableUrl::parse(location)?; + let scheme = table_url.scheme(); + let sql = + format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'"); + + let ctx = SessionContext::new(); + ctx.register_table_options_extension_from_scheme(scheme); + let table_options = get_table_options(&ctx, &sql).await; + let aws_options = table_options.extensions.get::().unwrap(); + let builder = + get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; + // get the actual configuration information, then assert_eq! + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId), + None + ); + assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Region), None); + assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Endpoint), None); + assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None); + // Default is to skip signature when no credentials are provided + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::SkipSignature), + Some("true".into()) + ); + Ok(()) + } + #[tokio::test] async fn s3_object_store_builder() -> Result<()> { // "fake" is uppercase to ensure the values are not lowercased when parsed @@ -493,29 +576,27 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; - - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { - ctx.register_table_options_extension_from_scheme(scheme); - let mut table_options = ctx.state().default_table_options(); - table_options.alter_with_string_hash_map(&cmd.options)?; - let aws_options = table_options.extensions.get::().unwrap(); - let builder = - get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; - // get the actual configuration information, then assert_eq! - let config = [ - (AmazonS3ConfigKey::AccessKeyId, access_key_id), - (AmazonS3ConfigKey::SecretAccessKey, secret_access_key), - (AmazonS3ConfigKey::Region, region), - (AmazonS3ConfigKey::Endpoint, endpoint), - (AmazonS3ConfigKey::Token, session_token), - ]; - for (key, value) in config { - assert_eq!(value, builder.get_config_value(&key).unwrap()); - } - } else { - return plan_err!("LogicalPlan is not a CreateExternalTable"); + ctx.register_table_options_extension_from_scheme(scheme); + let table_options = get_table_options(&ctx, &sql).await; + let aws_options = table_options.extensions.get::().unwrap(); + let builder = + get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; + // get the actual configuration information, then assert_eq! + let config = [ + (AmazonS3ConfigKey::AccessKeyId, access_key_id), + (AmazonS3ConfigKey::SecretAccessKey, secret_access_key), + (AmazonS3ConfigKey::Region, region), + (AmazonS3ConfigKey::Endpoint, endpoint), + (AmazonS3ConfigKey::Token, session_token), + ]; + for (key, value) in config { + assert_eq!(value, builder.get_config_value(&key).unwrap()); } + // Should not skip signature when credentials are provided + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::SkipSignature), + Some("false".into()) + ); Ok(()) } @@ -538,21 +619,15 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; - - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { - ctx.register_table_options_extension_from_scheme(scheme); - let mut table_options = ctx.state().default_table_options(); - table_options.alter_with_string_hash_map(&cmd.options)?; - let aws_options = table_options.extensions.get::().unwrap(); - let err = get_s3_object_store_builder(table_url.as_ref(), aws_options) - .await - .unwrap_err(); - - assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"); - } else { - return plan_err!("LogicalPlan is not a CreateExternalTable"); - } + ctx.register_table_options_extension_from_scheme(scheme); + + let table_options = get_table_options(&ctx, &sql).await; + let aws_options = table_options.extensions.get::().unwrap(); + let err = get_s3_object_store_builder(table_url.as_ref(), aws_options) + .await + .unwrap_err(); + + assert_eq!(err.to_string().lines().next().unwrap_or_default(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"); // Now add `allow_http` to the options and check if it works let sql = format!( @@ -563,19 +638,11 @@ mod tests { 'aws.allow_http' 'true'\ ) LOCATION '{location}'" ); + let table_options = get_table_options(&ctx, &sql).await; - let mut plan = ctx.state().create_logical_plan(&sql).await?; - - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { - ctx.register_table_options_extension_from_scheme(scheme); - let mut table_options = ctx.state().default_table_options(); - table_options.alter_with_string_hash_map(&cmd.options)?; - let aws_options = table_options.extensions.get::().unwrap(); - // ensure this isn't an error - get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; - } else { - return plan_err!("LogicalPlan is not a CreateExternalTable"); - } + let aws_options = table_options.extensions.get::().unwrap(); + // ensure this isn't an error + get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; Ok(()) } @@ -592,25 +659,19 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; - - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { - ctx.register_table_options_extension_from_scheme(scheme); - let mut table_options = ctx.state().default_table_options(); - table_options.alter_with_string_hash_map(&cmd.options)?; - let aws_options = table_options.extensions.get::().unwrap(); - let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?; - // get the actual configuration information, then assert_eq! - let config = [ - (AmazonS3ConfigKey::AccessKeyId, access_key_id), - (AmazonS3ConfigKey::SecretAccessKey, secret_access_key), - (AmazonS3ConfigKey::Endpoint, endpoint), - ]; - for (key, value) in config { - assert_eq!(value, builder.get_config_value(&key).unwrap()); - } - } else { - return plan_err!("LogicalPlan is not a CreateExternalTable"); + ctx.register_table_options_extension_from_scheme(scheme); + let table_options = get_table_options(&ctx, &sql).await; + + let aws_options = table_options.extensions.get::().unwrap(); + let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?; + // get the actual configuration information, then assert_eq! + let config = [ + (AmazonS3ConfigKey::AccessKeyId, access_key_id), + (AmazonS3ConfigKey::SecretAccessKey, secret_access_key), + (AmazonS3ConfigKey::Endpoint, endpoint), + ]; + for (key, value) in config { + assert_eq!(value, builder.get_config_value(&key).unwrap()); } Ok(()) @@ -629,30 +690,40 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; - - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { - ctx.register_table_options_extension_from_scheme(scheme); - let mut table_options = ctx.state().default_table_options(); - table_options.alter_with_string_hash_map(&cmd.options)?; - let gcp_options = table_options.extensions.get::().unwrap(); - let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?; - // get the actual configuration information, then assert_eq! - let config = [ - (GoogleConfigKey::ServiceAccount, service_account_path), - (GoogleConfigKey::ServiceAccountKey, service_account_key), - ( - GoogleConfigKey::ApplicationCredentials, - application_credentials_path, - ), - ]; - for (key, value) in config { - assert_eq!(value, builder.get_config_value(&key).unwrap()); - } - } else { - return plan_err!("LogicalPlan is not a CreateExternalTable"); + ctx.register_table_options_extension_from_scheme(scheme); + let table_options = get_table_options(&ctx, &sql).await; + + let gcp_options = table_options.extensions.get::().unwrap(); + let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?; + // get the actual configuration information, then assert_eq! + let config = [ + (GoogleConfigKey::ServiceAccount, service_account_path), + (GoogleConfigKey::ServiceAccountKey, service_account_key), + ( + GoogleConfigKey::ApplicationCredentials, + application_credentials_path, + ), + ]; + for (key, value) in config { + assert_eq!(value, builder.get_config_value(&key).unwrap()); } Ok(()) } + + /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the + /// resulting resolved `CreateExternalTable` command. + async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions { + let mut plan = ctx.state().create_logical_plan(&sql).await.unwrap(); + + let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else { + panic!("plan is not a CreateExternalTable"); + }; + + let mut table_options = ctx.state().default_table_options(); + table_options + .alter_with_string_hash_map(&cmd.options) + .unwrap(); + table_options + } } diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 2e14f1f54c6c..2eac86353f37 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -82,22 +82,29 @@ select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_par To read from an AWS S3 or GCS, use `s3` or `gs` as a protocol prefix. For example, to read a file in an S3 bucket named `my-data-bucket` use the URL `s3://my-data-bucket`and set the relevant access credentials as environmental -variables (e.g. for AWS S3 you need to at least `AWS_ACCESS_KEY_ID` and +variables (e.g. for AWS S3 you can use `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`). ```sql -select count(*) from 's3://my-data-bucket/athena_partitioned/hits.parquet' +> select count(*) from 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/'; ++------------+ +| count(*) | ++------------+ +| 1310903963 | ++------------+ ``` -See the [`CREATE EXTERNAL TABLE`](#create-external-table) section for +See the [`CREATE EXTERNAL TABLE`](#create-external-table) section below for additional configuration options. # `CREATE EXTERNAL TABLE` It is also possible to create a table backed by files or remote locations via -`CREATE EXTERNAL TABLE` as shown below. Note that DataFusion does not support wildcards (e.g. `*`) in file paths; instead, specify the directory path directly to read all compatible files in that directory. +`CREATE EXTERNAL TABLE` as shown below. Note that DataFusion does not support +wildcards (e.g. `*`) in file paths; instead, specify the directory path directly +to read all compatible files in that directory. -For example, to create a table `hits` backed by a local parquet file, use: +For example, to create a table `hits` backed by a local parquet file named `hits.parquet`: ```sql CREATE EXTERNAL TABLE hits @@ -105,7 +112,7 @@ STORED AS PARQUET LOCATION 'hits.parquet'; ``` -To create a table `hits` backed by a remote parquet file via HTTP(S), use +To create a table `hits` backed by a remote parquet file via HTTP(S): ```sql CREATE EXTERNAL TABLE hits @@ -127,7 +134,11 @@ select count(*) from hits; **Why Wildcards Are Not Supported** -Although wildcards (e.g., _.parquet or \*\*/_.parquet) may work for local filesystems in some cases, they are not officially supported by DataFusion. This is because wildcards are not universally applicable across all storage backends (e.g., S3, GCS). Instead, DataFusion expects the user to specify the directory path, and it will automatically read all compatible files within that directory. +Although wildcards (e.g., _.parquet or \*\*/_.parquet) may work for local +filesystems in some cases, they are not supported by DataFusion CLI. This +is because wildcards are not universally applicable across all storage backends +(e.g., S3, GCS). Instead, DataFusion expects the user to specify the directory +path, and it will automatically read all compatible files within that directory. For example, the following usage is not supported: @@ -148,7 +159,7 @@ CREATE EXTERNAL TABLE test ( day DATE ) STORED AS PARQUET -LOCATION 'gs://bucket/my_table'; +LOCATION 'gs://bucket/my_table/'; ``` # Formats @@ -168,6 +179,8 @@ LOCATION '/mnt/nyctaxi/tripdata.parquet'; Register a single folder parquet datasource. Note: All files inside must be valid parquet files and have compatible schemas +!! alert: The path must end in `/` otherwise DataFusion will treat the path as a file and not a directory + ```sql CREATE EXTERNAL TABLE taxi STORED AS PARQUET @@ -178,7 +191,7 @@ LOCATION '/mnt/nyctaxi/'; DataFusion will infer the CSV schema automatically or you can provide it explicitly. -Register a single file csv datasource with a header row. +Register a single file csv datasource with a header row: ```sql CREATE EXTERNAL TABLE test @@ -187,7 +200,7 @@ LOCATION '/path/to/aggregate_test_100.csv' OPTIONS ('has_header' 'true'); ``` -Register a single file csv datasource with explicitly defined schema. +Register a single file csv datasource with explicitly defined schema: ```sql CREATE EXTERNAL TABLE test ( @@ -213,7 +226,7 @@ LOCATION '/path/to/aggregate_test_100.csv'; ## HTTP(s) -To read from a remote parquet file via HTTP(S) you can use the following: +To read from a remote parquet file via HTTP(S): ```sql CREATE EXTERNAL TABLE hits @@ -223,9 +236,12 @@ LOCATION 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hit ## S3 -[AWS S3](https://aws.amazon.com/s3/) data sources must have connection credentials configured. +DataFusion CLI supports configuring [AWS S3](https://aws.amazon.com/s3/) via the +`CREATE EXTERNAL TABLE` statement and standard AWS configuration methods (via the +[`aws-config`] AWS SDK crate). -To create an external table from a file in an S3 bucket: +To create an external table from a file in an S3 bucket with explicit +credentials: ```sql CREATE EXTERNAL TABLE test @@ -238,7 +254,7 @@ OPTIONS( LOCATION 's3://bucket/path/file.parquet'; ``` -It is also possible to specify the access information using environment variables: +To create an external table using environment variables: ```bash $ export AWS_DEFAULT_REGION=us-east-2 @@ -247,7 +263,7 @@ $ export AWS_ACCESS_KEY_ID=****** $ datafusion-cli `datafusion-cli v21.0.0 -> create external table test stored as parquet location 's3://bucket/path/file.parquet'; +> create CREATE TABLE test STORED AS PARQUET LOCATION 's3://bucket/path/file.parquet'; 0 rows in set. Query took 0.374 seconds. > select * from test; +----------+----------+ @@ -258,6 +274,23 @@ $ datafusion-cli 1 row in set. Query took 0.171 seconds. ``` +To read from a public S3 bucket without signatures, use the +`aws.SKIP_SIGNATURE` option: + +```sql +CREATE EXTERNAL TABLE nyc_taxi_rides +STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/' +OPTIONS(aws.SKIP_SIGNATURE true); +``` + +Credentials are taken in this order of precedence: + +1. Explicitly specified in the `OPTIONS` clause of the `CREATE EXTERNAL TABLE` statement. +2. Determined by [`aws-config`] crate (standard environment variables such as `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` as well as other AWS specific features). + +If no credentials are specified, DataFusion CLI will use unsigned requests to S3, +which allows reading from public buckets. + Supported configuration options are: | Environment Variable | Configuration Option | Description | @@ -269,8 +302,11 @@ Supported configuration options are: | `AWS_SESSION_TOKEN` | `aws.token` | | | `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` | | See [IAM Roles] | | `AWS_ALLOW_HTTP` | | set to "true" to permit HTTP connections without TLS | +| `AWS_SKIP_SIGNATURE` | `aws.skip_signature` | If "true" does not sign requests | +| | `aws.nosign` | Alias for `skip_signature` | [iam roles]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html +[aws-config]: https://docs.rs/aws-config/latest/aws_config/ ## OSS From 659f6b17323f33e0f7fb67ce37e05837acd61321 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Jun 2025 09:03:22 -0400 Subject: [PATCH 2/6] improve docs --- docs/source/user-guide/cli/datasources.md | 29 +++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 2eac86353f37..afc4f6c0c50f 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -179,7 +179,10 @@ LOCATION '/mnt/nyctaxi/tripdata.parquet'; Register a single folder parquet datasource. Note: All files inside must be valid parquet files and have compatible schemas -!! alert: The path must end in `/` otherwise DataFusion will treat the path as a file and not a directory +:::{note} +Paths must end in Slash `/` +: The path must end in `/` otherwise DataFusion will treat the path as a file and not a directory +::: ```sql CREATE EXTERNAL TABLE taxi @@ -293,20 +296,20 @@ which allows reading from public buckets. Supported configuration options are: -| Environment Variable | Configuration Option | Description | -| ---------------------------------------- | ----------------------- | ---------------------------------------------------- | -| `AWS_ACCESS_KEY_ID` | `aws.access_key_id` | | -| `AWS_SECRET_ACCESS_KEY` | `aws.secret_access_key` | | -| `AWS_DEFAULT_REGION` | `aws.region` | | -| `AWS_ENDPOINT` | `aws.endpoint` | | -| `AWS_SESSION_TOKEN` | `aws.token` | | -| `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` | | See [IAM Roles] | -| `AWS_ALLOW_HTTP` | | set to "true" to permit HTTP connections without TLS | -| `AWS_SKIP_SIGNATURE` | `aws.skip_signature` | If "true" does not sign requests | -| | `aws.nosign` | Alias for `skip_signature` | +| Environment Variable | Configuration Option | Description | +| ---------------------------------------- | ----------------------- | ---------------------------------------------- | +| `AWS_ACCESS_KEY_ID` | `aws.access_key_id` | | +| `AWS_SECRET_ACCESS_KEY` | `aws.secret_access_key` | | +| `AWS_DEFAULT_REGION` | `aws.region` | | +| `AWS_ENDPOINT` | `aws.endpoint` | | +| `AWS_SESSION_TOKEN` | `aws.token` | | +| `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` | | See [IAM Roles] | +| `AWS_ALLOW_HTTP` | | If "true", permit HTTP connections without TLS | +| `AWS_SKIP_SIGNATURE` | `aws.skip_signature` | If "true", does not sign requests | +| | `aws.nosign` | Alias for `skip_signature` | [iam roles]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html -[aws-config]: https://docs.rs/aws-config/latest/aws_config/ +[`aws-config`]: https://docs.rs/aws-config/latest/aws_config/ ## OSS From 2358874fd67705ce8c29311aa2befd88ef426d99 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Jun 2025 12:35:04 -0400 Subject: [PATCH 3/6] clippy and test --- datafusion-cli/src/object_storage.rs | 35 +++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 66bf94c54829..32f2c9735ef4 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -537,18 +537,41 @@ mod tests { let aws_options = table_options.extensions.get::().unwrap(); let builder = get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; + + // If the environment variables are set (as they are in CI) use them + let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok(); + let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok(); + let expected_region = std::env::var("AWS_REGION").ok(); + let expected_endpoint = std::env::var("AWS_ENDPOINT").ok(); + // get the actual configuration information, then assert_eq! assert_eq!( builder.get_config_value(&AmazonS3ConfigKey::AccessKeyId), - None + expected_access_key_id + ); + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::SecretAccessKey), + expected_secret_access_key ); - assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Region), None); - assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Endpoint), None); - assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None); // Default is to skip signature when no credentials are provided + let expected_skip_signature = + if expected_access_key_id.is_none() && expected_secret_access_key.is_none() { + Some(String::from("true")) + } else { + None + }; + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::Region), + expected_region + ); + assert_eq!( + builder.get_config_value(&AmazonS3ConfigKey::Endpoint), + expected_endpoint + ); + assert_eq!(builder.get_config_value(&AmazonS3ConfigKey::Token), None); assert_eq!( builder.get_config_value(&AmazonS3ConfigKey::SkipSignature), - Some("true".into()) + expected_skip_signature ); Ok(()) } @@ -714,7 +737,7 @@ mod tests { /// Plans the `CREATE EXTERNAL TABLE` SQL statement and returns the /// resulting resolved `CreateExternalTable` command. async fn get_table_options(ctx: &SessionContext, sql: &str) -> TableOptions { - let mut plan = ctx.state().create_logical_plan(&sql).await.unwrap(); + let mut plan = ctx.state().create_logical_plan(sql).await.unwrap(); let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan else { panic!("plan is not a CreateExternalTable"); From 718301634b2866a81a394a1558dbc2cb5daa71d8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Jun 2025 10:18:32 -0400 Subject: [PATCH 4/6] Update test --- datafusion-cli/src/object_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 32f2c9735ef4..ace5775ac917 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -558,7 +558,7 @@ mod tests { if expected_access_key_id.is_none() && expected_secret_access_key.is_none() { Some(String::from("true")) } else { - None + Some(String::from("false")) }; assert_eq!( builder.get_config_value(&AmazonS3ConfigKey::Region), From dd62c2c6e355236dbef6efc8c9e6977bcd00f0d3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Jun 2025 00:25:09 -0400 Subject: [PATCH 5/6] Update datafusion-cli/Cargo.toml Co-authored-by: Dmitrii Blaginin --- datafusion-cli/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 40a9b28f272d..9387be972f59 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -55,7 +55,7 @@ datafusion = { workspace = true, features = [ dirs = "6.0.0" env_logger = { workspace = true } futures = { workspace = true } -log = "0.4.27" +log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "gcp", "http"] } parking_lot = { workspace = true } From 37d113222efee9812c7993b174201975291451c5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Jun 2025 00:49:28 -0400 Subject: [PATCH 6/6] Only use unsigned requests on `CredentialsError::CredentialsNotLoaded` --- datafusion-cli/src/object_storage.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index ace5775ac917..f203f9a008d3 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -28,6 +28,7 @@ use datafusion::execution::context::SessionState; use async_trait::async_trait; use aws_config::BehaviorVersion; +use aws_credential_types::provider::error::CredentialsError; use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; use log::debug; use object_store::aws::{AmazonS3Builder, AwsCredential}; @@ -140,10 +141,19 @@ impl CredentialsFromConfig { // we can call `provide_credentials` here. let credentials = match credentials.provide_credentials().await { Ok(_) => Some(credentials), - Err(e) => { - debug!("Could not use AWS SDK to get credentials: {e}"); + Err(CredentialsError::CredentialsNotLoaded(_)) => { + debug!("Could not use AWS SDK to get credentials"); None } + // other errors like `CredentialsError::InvalidConfiguration` + // should be returned to the user so they can be fixed + Err(e) => { + return Err(DataFusionError::ObjectStore(object_store::Error::Generic { + store: "S3", + source: format!("Error getting credentials from provider: {e}") + .into(), + })); + } }; Ok(Self { region,