From b2eb0986ed9e01508b0f82f97ae49b61eca49990 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 4 Nov 2024 17:07:22 +0800 Subject: [PATCH 1/3] disable `CountWildcardRule` for bigquery --- wren-core/core/src/mdl/context.rs | 5 +++-- wren-core/core/src/mdl/mod.rs | 12 ++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/wren-core/core/src/mdl/context.rs b/wren-core/core/src/mdl/context.rs index b87d0690f..21a40582f 100644 --- a/wren-core/core/src/mdl/context.rs +++ b/wren-core/core/src/mdl/context.rs @@ -17,7 +17,6 @@ use datafusion::common::Result; use datafusion::datasource::{TableProvider, TableType, ViewTable}; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::Expr; -use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion::optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule; use datafusion::optimizer::analyzer::inline_table_scan::InlineTableScan; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; @@ -88,7 +87,9 @@ pub async fn create_ctx_with_mdl( Arc::new(ExpandWildcardRule::new()), // [Expr::Wildcard] should be expanded before [TypeCoercion] Arc::new(TypeCoercion::new()), - Arc::new(CountWildcardRule::new()), + // Disable it to avoid generate the alias name, `count(*)` because BigQuery doesn't allow + // the special character `*` in the alias name + // Arc::new(CountWildcardRule::new()), ]); let new_state = if is_local_runtime { diff --git a/wren-core/core/src/mdl/mod.rs b/wren-core/core/src/mdl/mod.rs index d768caa81..ecf2ca06f 100644 --- a/wren-core/core/src/mdl/mod.rs +++ b/wren-core/core/src/mdl/mod.rs @@ -857,6 +857,18 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_disable_count_wildcard_rule() -> Result<()> { + let ctx = SessionContext::new(); + + let analyzed_mdl = Arc::new(AnalyzedWrenMDL::default()); + let sql = "select count(*) from (select 1)"; + let actual = + transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], sql).await?; + assert_eq!(actual, "SELECT count(*) FROM (SELECT 1)"); + Ok(()) + } + async fn assert_sql_valid_executable(sql: &str) -> Result<()> { let ctx = SessionContext::new(); // To roundtrip testing, we should register the mock table for the planned sql. From 4a956dc41b53f546559a0513b4a3440f8088af5f Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 4 Nov 2024 17:34:36 +0800 Subject: [PATCH 2/3] split unparse purpose and local run purpose rules --- wren-core/core/src/mdl/context.rs | 83 +++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 20 deletions(-) diff --git a/wren-core/core/src/mdl/context.rs b/wren-core/core/src/mdl/context.rs index 21a40582f..300d7e7ae 100644 --- a/wren-core/core/src/mdl/context.rs +++ b/wren-core/core/src/mdl/context.rs @@ -7,7 +7,7 @@ use crate::logical_plan::analyze::model_anlayze::ModelAnalyzeRule; use crate::logical_plan::analyze::model_generation::ModelGenerationRule; use crate::logical_plan::utils::create_schema; use crate::mdl::manifest::Model; -use crate::mdl::{AnalyzedWrenMDL, WrenMDL}; +use crate::mdl::{AnalyzedWrenMDL, SessionStateRef, WrenMDL}; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::catalog::Session; @@ -17,6 +17,7 @@ use datafusion::common::Result; use datafusion::datasource::{TableProvider, TableType, ViewTable}; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::Expr; +use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion::optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule; use datafusion::optimizer::analyzer::inline_table_scan::InlineTableScan; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; @@ -40,7 +41,7 @@ use datafusion::optimizer::rewrite_disjunctive_predicate::RewriteDisjunctivePred use datafusion::optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin; use datafusion::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; use datafusion::optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison; -use datafusion::optimizer::OptimizerRule; +use datafusion::optimizer::{AnalyzerRule, OptimizerRule}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion::sql::TableReference; @@ -67,20 +68,74 @@ pub async fn create_ctx_with_mdl( let new_state = SessionStateBuilder::new_from_existing( reset_default_catalog_schema.clone().read().deref().clone(), - ) - .with_analyzer_rules(vec![ + ); + + let new_state = if is_local_runtime { + new_state.with_analyzer_rules(analyze_rule_for_local_runtime( + Arc::clone(&analyzed_mdl), + reset_default_catalog_schema.clone(), + )) + // The plan will be executed locally, so apply the default optimizer rules + } else { + new_state.with_analyzer_rules(analyze_rule_for_unparsing( + Arc::clone(&analyzed_mdl), + reset_default_catalog_schema.clone(), + )) + .with_optimizer_rules(optimize_rule_for_unparsing()) + }; + + let new_state = new_state.with_config(config).build(); + let ctx = SessionContext::new_with_state(new_state); + register_table_with_mdl(&ctx, analyzed_mdl.wren_mdl()).await?; + Ok(ctx) +} + +// Analyzer rules for local runtime +fn analyze_rule_for_local_runtime( + analyzed_mdl: Arc, + session_state_ref: SessionStateRef, +) -> Vec> { + vec![ // expand the view should be the first rule Arc::new(ExpandWrenViewRule::new( Arc::clone(&analyzed_mdl), - Arc::clone(&reset_default_catalog_schema), + Arc::clone(&session_state_ref), )), Arc::new(ModelAnalyzeRule::new( Arc::clone(&analyzed_mdl), - Arc::clone(&reset_default_catalog_schema), + Arc::clone(&session_state_ref), )), Arc::new(ModelGenerationRule::new( Arc::clone(&analyzed_mdl), - reset_default_catalog_schema, + session_state_ref, + )), + Arc::new(InlineTableScan::new()), + // Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule]. + Arc::new(ExpandWildcardRule::new()), + // [Expr::Wildcard] should be expanded before [TypeCoercion] + Arc::new(TypeCoercion::new()), + Arc::new(CountWildcardRule::new()), + ] +} + +// Analyze rules for local runtime +fn analyze_rule_for_unparsing( + analyzed_mdl: Arc, + session_state_ref: SessionStateRef, +) -> Vec> { + vec![ + // expand the view should be the first rule + Arc::new(ExpandWrenViewRule::new( + Arc::clone(&analyzed_mdl), + Arc::clone(&session_state_ref), + )), + Arc::new(ModelAnalyzeRule::new( + Arc::clone(&analyzed_mdl), + Arc::clone(&session_state_ref), + )), + Arc::new(ModelGenerationRule::new( + Arc::clone(&analyzed_mdl), + session_state_ref, )), Arc::new(InlineTableScan::new()), // Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule]. @@ -90,19 +145,7 @@ pub async fn create_ctx_with_mdl( // Disable it to avoid generate the alias name, `count(*)` because BigQuery doesn't allow // the special character `*` in the alias name // Arc::new(CountWildcardRule::new()), - ]); - - let new_state = if is_local_runtime { - // The plan will be executed locally, so apply the default optimizer rules - new_state - } else { - new_state.with_optimizer_rules(optimize_rule_for_unparsing()) - }; - - let new_state = new_state.with_config(config).build(); - let ctx = SessionContext::new_with_state(new_state); - register_table_with_mdl(&ctx, analyzed_mdl.wren_mdl()).await?; - Ok(ctx) + ] } /// Optimizer rules for unparse From 2dedbf9a1c9e69d6ef750ae435f499a40eab78a3 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 4 Nov 2024 17:35:47 +0800 Subject: [PATCH 3/3] cargo fmt --- wren-core/core/src/mdl/context.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/wren-core/core/src/mdl/context.rs b/wren-core/core/src/mdl/context.rs index 300d7e7ae..cd0c38be3 100644 --- a/wren-core/core/src/mdl/context.rs +++ b/wren-core/core/src/mdl/context.rs @@ -77,11 +77,12 @@ pub async fn create_ctx_with_mdl( )) // The plan will be executed locally, so apply the default optimizer rules } else { - new_state.with_analyzer_rules(analyze_rule_for_unparsing( - Arc::clone(&analyzed_mdl), - reset_default_catalog_schema.clone(), - )) - .with_optimizer_rules(optimize_rule_for_unparsing()) + new_state + .with_analyzer_rules(analyze_rule_for_unparsing( + Arc::clone(&analyzed_mdl), + reset_default_catalog_schema.clone(), + )) + .with_optimizer_rules(optimize_rule_for_unparsing()) }; let new_state = new_state.with_config(config).build();