Skip to content

Commit

Permalink
Bugfix: Remove df-cli specific SQL statment options before executing …
Browse files Browse the repository at this point in the history
…with DataFusion (#8426)

* remove df-cli specific options from create external table options

* add test and comments

* cargo fmt

* merge main

* cargo toml format
  • Loading branch information
devinjdangelo authored Dec 6, 2023
1 parent d9d8ddd commit 99bf509
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 34 deletions.
19 changes: 10 additions & 9 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ url = "2.2"
[dev-dependencies]
assert_cmd = "2.0"
ctor = "0.2.0"
datafusion-common = { path = "../datafusion/common" }
predicates = "3.0"
rstest = "0.17"
31 changes: 25 additions & 6 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn exec_and_print(
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = ctx.state().statement_to_plan(statement).await?;
let mut plan = ctx.state().statement_to_plan(statement).await?;

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
Expand All @@ -221,10 +221,12 @@ async fn exec_and_print(
| LogicalPlan::Analyze(_)
);

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(ctx, cmd).await?;
}

let df = ctx.execute_logical_plan(plan).await?;
let results = df.collect().await?;

Expand All @@ -244,7 +246,7 @@ async fn exec_and_print(

async fn create_external_table(
ctx: &SessionContext,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<()> {
let table_path = ListingTableUrl::parse(&cmd.location)?;
let scheme = table_path.scheme();
Expand Down Expand Up @@ -285,15 +287,32 @@ async fn create_external_table(

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions};

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(sql).await?;
let mut plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(&ctx, cmd).await?;
let options: Vec<_> = cmd
.options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let statement_options = StatementOptions::new(options);
let file_type =
datafusion_common::FileType::from_str(cmd.file_type.as_str())?;

let _file_type_writer_options = FileTypeWriterOptions::build(
&file_type,
ctx.state().config_options(),
&statement_options,
)?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
Expand Down
41 changes: 22 additions & 19 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ use url::Url;

pub async fn get_s3_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);

if let (Some(access_key_id), Some(secret_access_key)) = (
cmd.options.get("access_key_id"),
cmd.options.get("secret_access_key"),
// These options are datafusion-cli specific and must be removed before passing through to datafusion.
// Otherwise, a Configuration error will be raised.
cmd.options.remove("access_key_id"),
cmd.options.remove("secret_access_key"),
) {
println!("removing secret access key!");
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);

if let Some(session_token) = cmd.options.get("session_token") {
if let Some(session_token) = cmd.options.remove("session_token") {
builder = builder.with_token(session_token);
}
} else {
Expand All @@ -66,7 +69,7 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}

if let Some(region) = cmd.options.get("region") {
if let Some(region) = cmd.options.remove("region") {
builder = builder.with_region(region);
}

Expand Down Expand Up @@ -99,7 +102,7 @@ impl CredentialProvider for S3CredentialProvider {

pub fn get_oss_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env()
Expand All @@ -109,15 +112,15 @@ pub fn get_oss_object_store_builder(
.with_region("do_not_care");

if let (Some(access_key_id), Some(secret_access_key)) = (
cmd.options.get("access_key_id"),
cmd.options.get("secret_access_key"),
cmd.options.remove("access_key_id"),
cmd.options.remove("secret_access_key"),
) {
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);
}

if let Some(endpoint) = cmd.options.get("endpoint") {
if let Some(endpoint) = cmd.options.remove("endpoint") {
builder = builder.with_endpoint(endpoint);
}

Expand All @@ -126,21 +129,21 @@ pub fn get_oss_object_store_builder(

pub fn get_gcs_object_store_builder(
url: &Url,
cmd: &CreateExternalTable,
cmd: &mut CreateExternalTable,
) -> Result<GoogleCloudStorageBuilder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);

if let Some(service_account_path) = cmd.options.get("service_account_path") {
if let Some(service_account_path) = cmd.options.remove("service_account_path") {
builder = builder.with_service_account_path(service_account_path);
}

if let Some(service_account_key) = cmd.options.get("service_account_key") {
if let Some(service_account_key) = cmd.options.remove("service_account_key") {
builder = builder.with_service_account_key(service_account_key);
}

if let Some(application_credentials_path) =
cmd.options.get("application_credentials_path")
cmd.options.remove("application_credentials_path")
{
builder = builder.with_application_credentials(application_credentials_path);
}
Expand Down Expand Up @@ -180,9 +183,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'region' '{region}', 'session_token' {session_token}) LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_s3_object_store_builder(table_url.as_ref(), cmd).await?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down Expand Up @@ -212,9 +215,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' '{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_oss_object_store_builder(table_url.as_ref(), cmd)?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down Expand Up @@ -244,9 +247,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('service_account_path' '{service_account_path}', 'service_account_key' '{service_account_key}', 'application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");

let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
let builder = get_gcs_object_store_builder(table_url.as_ref(), cmd)?;
// get the actual configuration information, then assert_eq!
let config = [
Expand Down

0 comments on commit 99bf509

Please sign in to comment.