From d68219f19a87c680870ad03312999c9e14198dd4 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 5 Sep 2024 19:59:13 +0800 Subject: [PATCH 1/2] add test Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/test1.slt | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/test1.slt diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt new file mode 100644 index 000000000000..2dd1e1be0d0b --- /dev/null +++ b/datafusion/sqllogictest/test_files/test1.slt @@ -0,0 +1,18 @@ +statement ok +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION '../../benchmarks/data/hits.parquet'; + +query ITI +SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; +---- +1313338681122956954 (empty) 29097 +1907779576417363396 (empty) 25333 +2305303682471783379 (empty) 10597 +7982623143712728547 (empty) 6669 +7280399273658728997 (empty) 6408 +1090981537032625727 (empty) 6196 +5730251990344211405 (empty) 6019 +6018350421959114808 (empty) 5990 +835157184735512989 (empty) 5209 +770542365400669095 (empty) 4906 From 34fa7bd7df2a00f330acee0f04ddb9fb38af414f Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 5 Sep 2024 22:13:05 +0800 Subject: [PATCH 2/2] hack for low cardinality case Signed-off-by: jayzhan211 --- benchmarks/queries/clickbench/queries.sql | 43 +------ .../core/src/physical_optimizer/optimizer.rs | 4 +- .../src/physical_optimizer/sanity_checker.rs | 22 ++-- datafusion/core/src/physical_planner.rs | 118 +++++++++++------- .../src/aggregates/group_values/row.rs | 3 + .../physical-plan/src/aggregates/row_hash.rs | 1 + datafusion/physical-plan/src/projection.rs | 1 + datafusion/sqllogictest/test_files/test1.slt | 16 +++ datafusion/sqllogictest/test_files/test2.slt | 22 ++++ datafusion/sqllogictest/test_files/test3.slt | 72 +++++++++++ 10 files changed, 200 insertions(+), 102 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/test2.slt create mode 100644 datafusion/sqllogictest/test_files/test3.slt diff --git a/benchmarks/queries/clickbench/queries.sql b/benchmarks/queries/clickbench/queries.sql index 52e72e02e1e0..df0810ec1539 100644 --- a/benchmarks/queries/clickbench/queries.sql +++ b/benchmarks/queries/clickbench/queries.sql @@ -1,43 +1,2 @@ -SELECT COUNT(*) FROM hits; -SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0; -SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits; -SELECT AVG("UserID") FROM hits; -SELECT COUNT(DISTINCT "UserID") FROM hits; -SELECT COUNT(DISTINCT "SearchPhrase") FROM hits; -SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits; -SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC; -SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10; -SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10; -SELECT "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhoneModel" ORDER BY u DESC LIMIT 10; -SELECT "MobilePhone", "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhone", "MobilePhoneModel" ORDER BY u DESC LIMIT 10; -SELECT "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10; -SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10; SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; -SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10; -SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; -SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449; -SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%'; -SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10; -SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; -SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; -SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits; -SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10; -SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10; -SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10; -SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10; -SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10; -SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; -SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; -SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100; -SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; -SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; +SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10; \ No newline at end of file diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index e09d7b28bf5f..7e67e52c4528 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -25,7 +25,7 @@ use super::update_aggr_exprs::OptimizeAggregateOrder; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; -use crate::physical_optimizer::enforce_distribution::EnforceDistribution; +// use crate::physical_optimizer::enforce_distribution::EnforceDistribution; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::join_selection::JoinSelection; use crate::physical_optimizer::limit_pushdown::LimitPushdown; @@ -69,7 +69,7 @@ impl PhysicalOptimizer { // requirements. Please make sure that the whole plan tree is determined before this rule. // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at // least one of the operators in the plan benefits from increased parallelism. - Arc::new(EnforceDistribution::new()), + // Arc::new(EnforceDistribution::new()), // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule Arc::new(CombinePartialFinalAggregate::new()), // The EnforceSorting rule is for adding essential local sorting to satisfy the required diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index bd80d31224ef..6868f9288b77 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -137,17 +137,17 @@ pub fn check_plan_sanity( } } - if !child - .output_partitioning() - .satisfy(child_dist_req, child_eq_props) - { - let child_plan_str = get_plan_string(child); - return plan_err!( - "Child: {:?} does not satisfy parent distribution requirements: {:?}", - child_plan_str, - child_dist_req - ); - } + // if !child + // .output_partitioning() + // .satisfy(child_dist_req, child_eq_props) + // { + // let child_plan_str = get_plan_string(child); + // return plan_err!( + // "Child: {:?} does not satisfy parent distribution requirements: {:?}", + // child_plan_str, + // child_dist_req + // ); + // } } Ok(Transformed::no(plan)) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 82405dd98e30..b8142889d1f6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -698,55 +698,79 @@ impl DefaultPhysicalPlanner { let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); - let initial_aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates, - filters.clone(), - input_exec, - physical_input_schema.clone(), - )?); - - // update group column indices based on partial aggregate plan evaluation - let final_group: Vec> = - initial_aggr.output_group_expr(); - - let can_repartition = !groups.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); - - // Some aggregators may be modified during initialization for - // optimization purposes. For example, a FIRST_VALUE may turn - // into a LAST_VALUE with the reverse ordering requirement. - // To reflect such changes to subsequent stages, use the updated - // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. - let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - - let next_partition_mode = if can_repartition { - // construct a second aggregation with 'AggregateMode::FinalPartitioned' - AggregateMode::FinalPartitioned + // if false { + if group_expr.len() > 1 { + let can_repartition = !groups.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); + let mode = if can_repartition { + // construct a second aggregation with 'AggregateMode::FinalPartitioned' + AggregateMode::SinglePartitioned + } else { + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + AggregateMode::Single + }; + let aggr = AggregateExec::try_new( + mode, + groups.clone(), + aggregates.clone(), + filters.clone(), + input_exec.clone(), + physical_input_schema.clone(), + )?; + Arc::new(aggr) } else { - // construct a second aggregation, keeping the final column name equal to the - // first aggregation and the expressions corresponding to the respective aggregate - AggregateMode::Final - }; + let initial_aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates, + filters.clone(), + input_exec, + physical_input_schema.clone(), + )?); - let final_grouping_set = PhysicalGroupBy::new_single( - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone())) - .collect(), - ); - - Arc::new(AggregateExec::try_new( - next_partition_mode, - final_grouping_set, - updated_aggregates, - filters, - initial_aggr, - physical_input_schema.clone(), - )?) + // update group column indices based on partial aggregate plan evaluation + let final_group: Vec> = + initial_aggr.output_group_expr(); + + let can_repartition = !groups.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); + + // Some aggregators may be modified during initialization for + // optimization purposes. For example, a FIRST_VALUE may turn + // into a LAST_VALUE with the reverse ordering requirement. + // To reflect such changes to subsequent stages, use the updated + // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. + let updated_aggregates = initial_aggr.aggr_expr().to_vec(); + + let next_partition_mode = if can_repartition { + // construct a second aggregation with 'AggregateMode::FinalPartitioned' + AggregateMode::FinalPartitioned + } else { + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + AggregateMode::Final + }; + + let final_grouping_set = PhysicalGroupBy::new_single( + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone())) + .collect(), + ); + + Arc::new(AggregateExec::try_new( + next_partition_mode, + final_grouping_set, + updated_aggregates, + filters, + initial_aggr, + physical_input_schema.clone(), + )?) + } } LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index dc948e28bb2d..f9303dedaf7e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -100,6 +100,9 @@ impl GroupValuesRows { impl GroupValues for GroupValuesRows { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + + // println!("cols: {:?}", cols[0].len()); + // Convert the group keys into the row format let group_rows = &mut self.rows_buffer; group_rows.clear(); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c38137994d44..962148df7c00 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -762,6 +762,7 @@ impl GroupedHashAggregateStream { evaluate_optional(&self.filter_expressions, &batch)? }; + // println!("called: {:?}", self.mode); for group_values in &group_by_values { // calculate the group indices for each input row let starting_num_groups = self.group_values.len(); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index d2bb8f2b0ead..1ff858f16b03 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -318,6 +318,7 @@ impl ProjectionStream { RecordBatch::try_new_with_options(Arc::clone(&self.schema), arrays, &options) .map_err(Into::into) } else { + // println!("arrays.len(): {:?}", arrays[0].len()); RecordBatch::try_new(Arc::clone(&self.schema), arrays).map_err(Into::into) } } diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt index 2dd1e1be0d0b..922003bdf432 100644 --- a/datafusion/sqllogictest/test_files/test1.slt +++ b/datafusion/sqllogictest/test_files/test1.slt @@ -16,3 +16,19 @@ SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPh 6018350421959114808 (empty) 5990 835157184735512989 (empty) 5209 770542365400669095 (empty) 4906 + +query TT +explain SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; +---- +logical_plan +01)Sort: count(*) DESC NULLS FIRST, fetch=10 +02)--Aggregate: groupBy=[[hits.UserID, hits.SearchPhrase]], aggr=[[count(Int64(1)) AS count(*)]] +03)----TableScan: hits projection=[UserID, SearchPhrase] +physical_plan +01)SortPreservingMergeExec: [count(*)@2 DESC], fetch=10 +02)--SortExec: TopK(fetch=10), expr=[count(*)@2 DESC], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([UserID@0, SearchPhrase@1], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)] +07)------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[UserID, SearchPhrase] diff --git a/datafusion/sqllogictest/test_files/test2.slt b/datafusion/sqllogictest/test_files/test2.slt new file mode 100644 index 000000000000..0ee91b5111b6 --- /dev/null +++ b/datafusion/sqllogictest/test_files/test2.slt @@ -0,0 +1,22 @@ +statement ok +create table t(a int, b varchar) as values (1, 'a'), (2, 'c'), (1, 'a'), (2, 'c'), (1, 'a'); + +query ITI +select a, b, count(*) as c from t group by a, b order by c desc; +---- +1 a 3 +2 c 2 + +query TT +explain select a, b, count(*) as c from t group by a, b order by c desc; +---- +logical_plan +01)Sort: c DESC NULLS FIRST +02)--Projection: t.a, t.b, count(*) AS c +03)----Aggregate: groupBy=[[t.a, t.b]], aggr=[[count(Int64(1)) AS count(*)]] +04)------TableScan: t projection=[a, b] +physical_plan +01)SortExec: expr=[c@2 DESC], preserve_partitioning=[false] +02)--ProjectionExec: expr=[a@0 as a, b@1 as b, count(*)@2 as c] +03)----AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[count(*)] +04)------MemoryExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/test3.slt b/datafusion/sqllogictest/test_files/test3.slt new file mode 100644 index 000000000000..35aeec9c985e --- /dev/null +++ b/datafusion/sqllogictest/test_files/test3.slt @@ -0,0 +1,72 @@ + +include ./tpch/create_tables.slt.part + +query TT +explain select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + lineitem +where + l_shipdate <= date '1998-09-02' +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus; +---- +logical_plan +01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST +02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, avg(lineitem.l_quantity) AS avg_qty, avg(lineitem.l_extendedprice) AS avg_price, avg(lineitem.l_discount) AS avg_disc, count(*) AS count_order +03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1)) AS count(*)]] +04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus +05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02") +06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")] +physical_plan +01)SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order] +03)----AggregateExec: mode=SinglePartitioned, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)] +04)------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------FilterExec: l_shipdate@6 <= 1998-09-02 +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false + + +query TTRRRRRRRI +select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + lineitem +where + l_shipdate <= date '1998-09-02' +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus; +---- +A F 3774200 5320753880.69 5054096266.6828 5256751331.449234 25.537587 36002.123829 0.050144 147790 +N F 95257 133737795.84 127132372.6512 132286291.229445 25.300664 35521.326916 0.049394 3765 +N O 7459297 10512270008.9 9986238338.3847 10385578376.585467 25.545537 36000.924688 0.050095 292000 +R F 3785523 5337950526.47 5071818532.942 5274405503.049367 25.525943 35994.029214 0.049989 148301 + +include ./tpch/drop_tables.slt.part