From 515864a23b166e002ae0394a3ea7f2d314170f72 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 21 Aug 2025 21:15:40 +0200 Subject: [PATCH 01/20] support WHERE pipe operator --- datafusion/sql/src/query.rs | 39 +++++++++++++++-- datafusion/sql/src/select.rs | 2 +- .../sqllogictest/test_files/pipe_operator.slt | 42 +++++++++++++++++++ 3 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/pipe_operator.slt diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index c06147e08f87..a88ca0c555f4 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -27,8 +27,8 @@ use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query, - SelectInto, SetExpr, + Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, PipeOperator, + Query, SelectInto, SetExpr, }; use sqlparser::tokenizer::Span; @@ -48,8 +48,10 @@ impl SqlToRel<'_, S> { self.plan_with_clause(with, planner_context)?; } + let pipe_operators = query.pipe_operators.clone(); + let set_expr = *query.body; - match set_expr { + let plan = match set_expr { SetExpr::Select(mut select) => { let select_into = select.into.take(); let plan = @@ -78,6 +80,37 @@ impl SqlToRel<'_, S> { let plan = self.order_by(plan, order_by_rex)?; self.limit(plan, query.limit_clause, planner_context) } + }?; + + self.pipe_operators(plan, pipe_operators, planner_context) + } + + /// Apply pipe operators to a plan + fn pipe_operators( + &self, + plan: LogicalPlan, + pipe_operators: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + let mut plan = plan; + for pipe_operator in pipe_operators { + plan = self.pipe_operator(plan, pipe_operator, planner_context)?; + } + Ok(plan) + } + + /// Apply a pipe operator to a plan + fn pipe_operator( + &self, + plan: LogicalPlan, + pipe_operator: PipeOperator, + planner_context: &mut PlannerContext, + ) -> Result { + match pipe_operator { + PipeOperator::Where { expr } => { + self.plan_selection(Some(expr), plan, planner_context) + } + x => not_impl_err!("{x} pipe operator is not supported yet"), } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1a90e5e09b77..bcc6da331813 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -574,7 +574,7 @@ impl SqlToRel<'_, S> { Ok((intermediate_plan, intermediate_select_exprs)) } - fn plan_selection( + pub(crate) fn plan_selection( &self, selection: Option, plan: LogicalPlan, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt new file mode 100644 index 000000000000..d72480c06e1b --- /dev/null +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# BigQuery supports the pipe operator syntax +# TODO: Make the Generic dialect support the pipe operator syntax +statement ok +set datafusion.sql_parser.dialect = 'BigQuery'; + +statement ok +CREATE TABLE test( + a INT, + b FLOAT, + c VARCHAR, + n VARCHAR +) AS VALUES + (1, 1.1, 'a', NULL), + (2, 2.2, 'b', NULL), + (3, 3.3, 'c', NULL) +; + +# WHERE pipe +query IRTT +SELECT * +FROM test +|> WHERE a > 1 +---- +2 2.2 b NULL +3 3.3 c NULL From 848dc08d83b922b66604bb0c5b8e0df4a8866e9a Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 21 Aug 2025 21:23:16 +0200 Subject: [PATCH 02/20] support order by --- datafusion/sql/src/query.rs | 10 ++++++++++ datafusion/sqllogictest/test_files/pipe_operator.slt | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index a88ca0c555f4..54cc2b7605d6 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -110,6 +110,16 @@ impl SqlToRel<'_, S> { PipeOperator::Where { expr } => { self.plan_selection(Some(expr), plan, planner_context) } + PipeOperator::OrderBy { exprs } => { + let sort_exprs = self.order_by_to_sort_expr( + exprs, + plan.schema(), + planner_context, + true, + None, + )?; + self.order_by(plan, sort_exprs) + } x => not_impl_err!("{x} pipe operator is not supported yet"), } } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index d72480c06e1b..42522b7aeb23 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -40,3 +40,13 @@ FROM test ---- 2 2.2 b NULL 3 3.3 c NULL + +# ORDER BY pipe +query IRTT +SELECT * +FROM test +|> ORDER BY a DESC +---- +3 3.3 c NULL +2 2.2 b NULL +1 1.1 a NULL From ac64a79be6936e5682f12cf5e0392338ade7c86e Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 21 Aug 2025 21:36:50 +0200 Subject: [PATCH 03/20] support limit --- datafusion/sql/src/query.rs | 16 ++++++++++++++-- .../sqllogictest/test_files/pipe_operator.slt | 9 +++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 54cc2b7605d6..50bbcf92c4c4 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -27,8 +27,8 @@ use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, PipeOperator, - Query, SelectInto, SetExpr, + Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, + OrderByKind, PipeOperator, Query, SelectInto, SetExpr, }; use sqlparser::tokenizer::Span; @@ -120,6 +120,18 @@ impl SqlToRel<'_, S> { )?; self.order_by(plan, sort_exprs) } + PipeOperator::Limit { expr, offset } => self.limit( + plan, + Some(LimitClause::LimitOffset { + limit: Some(expr), + offset: offset.map(|offset| Offset { + value: offset, + rows: OffsetRows::None, + }), + limit_by: vec![], + }), + planner_context, + ), x => not_impl_err!("{x} pipe operator is not supported yet"), } } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 42522b7aeb23..c3d174959d5f 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -50,3 +50,12 @@ FROM test 3 3.3 c NULL 2 2.2 b NULL 1 1.1 a NULL + +# ORDER BY pipe, limit +query IRTT +SELECT * +FROM test +|> ORDER BY a DESC +|> LIMIT 1 +---- +3 3.3 c NULL From 3f7d647df6b83896cb7f512a84d195ca1c828849 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 21 Aug 2025 21:44:18 +0200 Subject: [PATCH 04/20] select pipe --- datafusion/sql/src/query.rs | 8 +++++++- datafusion/sql/src/select.rs | 8 ++++++-- datafusion/sqllogictest/test_files/pipe_operator.slt | 10 ++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 50bbcf92c4c4..d68df64963ae 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -132,7 +132,13 @@ impl SqlToRel<'_, S> { }), planner_context, ), - x => not_impl_err!("{x} pipe operator is not supported yet"), + PipeOperator::Select { exprs } => { + let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); + let select_exprs = + self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; + self.project(plan, select_exprs) + } + x => not_impl_err!("`{x}` pipe operator is not supported yet"), } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index bcc6da331813..bb6d4f96ba58 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -655,7 +655,7 @@ impl SqlToRel<'_, S> { } /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions. - fn prepare_select_exprs( + pub(crate) fn prepare_select_exprs( &self, plan: &LogicalPlan, projection: Vec, @@ -823,7 +823,11 @@ impl SqlToRel<'_, S> { } /// Wrap a plan in a projection - fn project(&self, input: LogicalPlan, expr: Vec) -> Result { + pub(crate) fn project( + &self, + input: LogicalPlan, + expr: Vec, + ) -> Result { // convert to Expr for validate_schema_satisfies_exprs let exprs = expr .iter() diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index c3d174959d5f..a28553dbb39a 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -59,3 +59,13 @@ FROM test |> LIMIT 1 ---- 3 3.3 c NULL + +# SELECT pipe +query I +SELECT * +FROM test +|> SELECT a +---- +1 +2 +3 From 420bf001a9feeca21f5d9cd01ee48bfc9eb3e8c6 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Thu, 21 Aug 2025 21:59:32 +0200 Subject: [PATCH 05/20] extend support --- datafusion/sql/src/query.rs | 14 ++++++++++++++ .../sqllogictest/test_files/pipe_operator.slt | 11 +++++++++++ 2 files changed, 25 insertions(+) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index d68df64963ae..28ebf5469545 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -23,6 +23,7 @@ use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; use datafusion_expr::expr::Sort; +use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; @@ -138,6 +139,19 @@ impl SqlToRel<'_, S> { self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; self.project(plan, select_exprs) } + PipeOperator::Extend { exprs } => { + let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); + let extend_exprs = + self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; + let all_exprs = plan + .expressions() + .into_iter() + .map(SelectExpr::Expression) + .chain(extend_exprs) + .collect(); + self.project(plan, all_exprs) + } + x => not_impl_err!("`{x}` pipe operator is not supported yet"), } } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index a28553dbb39a..debe7d1fd8d2 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -69,3 +69,14 @@ FROM test 1 2 3 + +# EXTEND pipe +query IRR +SELECT * +FROM test +|> SELECT a, b +|> EXTEND a + b AS a_plus_b +---- +1 1.1 2.1 +2 2.2 4.2 +3 3.3 6.3 From 88343e8e9aefd74cdd4037f91a95cf82a07c6d11 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Fri, 22 Aug 2025 16:49:41 +0200 Subject: [PATCH 06/20] document supported pipe operators in user guide --- docs/source/user-guide/sql/operators.md | 88 +++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index b63f55239621..3e3a2796def2 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -613,3 +613,91 @@ bar") | bar | +-----------------+ ``` + +## Pipe operators +Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`. +The SQL dialect can be set like this: +```sql +set datafusion.sql_parser.dialect = 'BigQuery'; +``` + +DataFusion currently supports the following pipe operators: +- [WHERE](#pipe_where) +- [ORDER BY](#pipe_order_by) +- [LIMIT](#pipe_limit) +- [SELECT](#pipe_select) +- [EXTEND](#pipe_extend) + + +(pipe_where)= +### WHERE + +```sql +> select * from range(0,10) +|> where value < 2; ++-------+ +| value | ++-------+ +| 0 | +| 1 | ++-------+ +``` + +(pipe_order_by)= +### ORDER BY + +```sql +> select * from range(0,3) +|> order by value desc; ++-------+ +| value | ++-------+ +| 2 | +| 1 | +| 0 | ++-------+ +``` + +(pipe_limit)= +### LIMIT + +```sql +> select * from range(0,3) +|> order by value desc +|> limit 1; ++-------+ +| value | ++-------+ +| 2 | ++-------+ +``` + +(pipe_select)= +### SELECT + +```sql +> select * from range(0,3) +|> select value + 10; ++---------------------------+ +| range().value + Int64(10) | ++---------------------------+ +| 10 | +| 11 | +| 12 | ++---------------------------+ +``` + +(pipe_extend)= +### EXTEND + +```sql +> select * from range(0,3) +|> extend -value AS minus_value; ++-------+-------------+ +| value | minus_value | ++-------+-------------+ +| 0 | 0 | +| 1 | -1 | +| 2 | -2 | ++-------+-------------+ +``` From ca453b232154749ceed8c3f88fb55eecd7626770 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Fri, 22 Aug 2025 21:48:50 +0200 Subject: [PATCH 07/20] fmt --- docs/source/user-guide/sql/operators.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index 3e3a2796def2..7bbfa9aad8c0 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -615,21 +615,24 @@ bar | ``` ## Pipe operators + Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`. The SQL dialect can be set like this: + ```sql set datafusion.sql_parser.dialect = 'BigQuery'; ``` DataFusion currently supports the following pipe operators: + - [WHERE](#pipe_where) - [ORDER BY](#pipe_order_by) - [LIMIT](#pipe_limit) - [SELECT](#pipe_select) - [EXTEND](#pipe_extend) - (pipe_where)= + ### WHERE ```sql @@ -644,6 +647,7 @@ DataFusion currently supports the following pipe operators: ``` (pipe_order_by)= + ### ORDER BY ```sql @@ -659,6 +663,7 @@ DataFusion currently supports the following pipe operators: ``` (pipe_limit)= + ### LIMIT ```sql @@ -673,6 +678,7 @@ DataFusion currently supports the following pipe operators: ``` (pipe_select)= + ### SELECT ```sql @@ -688,6 +694,7 @@ DataFusion currently supports the following pipe operators: ``` (pipe_extend)= + ### EXTEND ```sql From 1e0b8db994f020b2bc5a3c8d50826f86a8862b6d Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Mon, 25 Aug 2025 22:57:12 +0200 Subject: [PATCH 08/20] fix where pipe before extend --- datafusion/sql/src/query.rs | 12 +++++------- datafusion/sqllogictest/test_files/pipe_operator.slt | 9 +++++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 28ebf5469545..d13578a219b2 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -21,7 +21,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; -use datafusion_expr::expr::Sort; +use datafusion_expr::expr::{Sort, WildcardOptions}; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ @@ -143,12 +143,10 @@ impl SqlToRel<'_, S> { let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); let extend_exprs = self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; - let all_exprs = plan - .expressions() - .into_iter() - .map(SelectExpr::Expression) - .chain(extend_exprs) - .collect(); + let all_exprs = + std::iter::once(SelectExpr::Wildcard(WildcardOptions::default())) + .chain(extend_exprs) + .collect(); self.project(plan, all_exprs) } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index debe7d1fd8d2..6b92df943138 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -80,3 +80,12 @@ FROM test 1 1.1 2.1 2 2.2 4.2 3 3.3 6.3 + +query IRR +SELECT * +FROM test +|> SELECT a, b +|> where a = 1 +|> EXTEND a + b AS a_plus_b +---- +1 1.1 2.1 From 9da7e4fa324bf0d282a53a08421e6e820dc18e63 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 24 Aug 2025 17:34:50 +0200 Subject: [PATCH 09/20] support AS --- datafusion-testing | 2 +- datafusion/sql/src/query.rs | 10 +++++++++- .../sqllogictest/test_files/pipe_operator.slt | 11 +++++++++++ docs/source/user-guide/sql/operators.md | 18 ++++++++++++++++++ simon/data.csv | 3 +++ simon/query.sql | 10 ++++++++++ 6 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 simon/data.csv create mode 100644 simon/query.sql diff --git a/datafusion-testing b/datafusion-testing index f72ac4075ada..e9f9e22ccf09 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit f72ac4075ada5ea9810551bc0c3e3161c61204a2 +Subproject commit e9f9e22ccf09145a7368f80fd6a871f11e2b4481 diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index d13578a219b2..c5e76932765e 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -29,7 +29,7 @@ use datafusion_expr::{ }; use sqlparser::ast::{ Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, - OrderByKind, PipeOperator, Query, SelectInto, SetExpr, + OrderByKind, PipeOperator, Query, SelectInto, SetExpr, TableAlias, }; use sqlparser::tokenizer::Span; @@ -149,6 +149,14 @@ impl SqlToRel<'_, S> { .collect(); self.project(plan, all_exprs) } + PipeOperator::As { alias } => self.apply_table_alias( + plan, + TableAlias { + name: alias, + // Apply to all fields + columns: vec![], + }, + ), x => not_impl_err!("`{x}` pipe operator is not supported yet"), } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 6b92df943138..e2efb0128d74 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -89,3 +89,14 @@ FROM test |> EXTEND a + b AS a_plus_b ---- 1 1.1 2.1 + +# AS pipe +query I +SELECT * +FROM test +|> as test_pipe +|> select test_pipe.a +---- +1 +2 +3 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index 7bbfa9aad8c0..d3cb548a73ee 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -630,6 +630,7 @@ DataFusion currently supports the following pipe operators: - [LIMIT](#pipe_limit) - [SELECT](#pipe_select) - [EXTEND](#pipe_extend) +- [AS](#pipe_as) (pipe_where)= @@ -708,3 +709,20 @@ DataFusion currently supports the following pipe operators: | 2 | -2 | +-------+-------------+ ``` + +(pipe_as)= + +### AS + +```sql +> select * from range(0,3) +|> as my_range +|> SELECT my_range.value; ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | ++-------+ +``` diff --git a/simon/data.csv b/simon/data.csv new file mode 100644 index 000000000000..eba8ed27a30f --- /dev/null +++ b/simon/data.csv @@ -0,0 +1,3 @@ +afdeling,budget,new_budget +a,2,2 +b,1,2 diff --git a/simon/query.sql b/simon/query.sql new file mode 100644 index 000000000000..0f2d0d7f6f3d --- /dev/null +++ b/simon/query.sql @@ -0,0 +1,10 @@ +set datafusion.sql_parser.dialect = 'BigQuery'; + +CREATE EXTERNAL TABLE test +STORED AS CSV +LOCATION '/Users/svs/code/datafusion/simon/data.csv' +OPTIONS ('has_header' 'true'); + +select * from test +|> where afdeling = 'a' +|> extend budget - new_budget as diff From b2902161e8e21427bdcb0f41f3d6f1bfcaa07e42 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 24 Aug 2025 17:55:53 +0200 Subject: [PATCH 10/20] support union --- datafusion/sql/src/query.rs | 22 ++++++++++++++++++- .../sqllogictest/test_files/pipe_operator.slt | 16 ++++++++++++++ docs/source/user-guide/sql/operators.md | 22 +++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index c5e76932765e..7aa9f3a4f103 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -29,7 +29,7 @@ use datafusion_expr::{ }; use sqlparser::ast::{ Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, - OrderByKind, PipeOperator, Query, SelectInto, SetExpr, TableAlias, + OrderByKind, PipeOperator, Query, SelectInto, SetExpr, SetOperator, TableAlias, }; use sqlparser::tokenizer::Span; @@ -157,6 +157,26 @@ impl SqlToRel<'_, S> { columns: vec![], }, ), + PipeOperator::Union { + set_quantifier, + queries, + } => { + let left_plan = plan; + let mut result_plan = left_plan; + + // Process each query in the union + for query in queries { + let right_plan = self.query_to_plan(query, planner_context)?; + result_plan = self.set_operation_to_plan( + SetOperator::Union, + result_plan, + right_plan, + set_quantifier, + )?; + } + + Ok(result_plan) + } x => not_impl_err!("`{x}` pipe operator is not supported yet"), } diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index e2efb0128d74..915fbb54418e 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -100,3 +100,19 @@ FROM test 1 2 3 + +# UNION pipe +query I +SELECT * +FROM test +|> select a +|> UNION ALL ( + SELECT a FROM test +); +---- +1 +2 +3 +1 +2 +3 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index d3cb548a73ee..ca7c29e4fefd 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -631,6 +631,7 @@ DataFusion currently supports the following pipe operators: - [SELECT](#pipe_select) - [EXTEND](#pipe_extend) - [AS](#pipe_as) +- [UNION](#pipe_union) (pipe_where)= @@ -726,3 +727,24 @@ DataFusion currently supports the following pipe operators: | 2 | +-------+ ``` + +(pipe_union)= + +### UNION + +```sql +> select * from range(0,3) +|> union all ( + select * from range(3,6) +); ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | +| 5 | ++-------+ +``` From 5be5480321733f32834f73dd707e8a6a03ec91b7 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 24 Aug 2025 18:11:33 +0200 Subject: [PATCH 11/20] support intersection --- datafusion/sql/src/query.rs | 60 +++++++++++++------ .../sqllogictest/test_files/pipe_operator.slt | 9 +++ docs/source/user-guide/sql/operators.md | 17 ++++++ 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 7aa9f3a4f103..77ea0b90a1ce 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -29,7 +29,8 @@ use datafusion_expr::{ }; use sqlparser::ast::{ Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, - OrderByKind, PipeOperator, Query, SelectInto, SetExpr, SetOperator, TableAlias, + OrderByKind, PipeOperator, Query, SelectInto, SetExpr, SetOperator, SetQuantifier, + TableAlias, }; use sqlparser::tokenizer::Span; @@ -160,26 +161,51 @@ impl SqlToRel<'_, S> { PipeOperator::Union { set_quantifier, queries, - } => { - let left_plan = plan; - let mut result_plan = left_plan; + } => self.pipe_operator_set( + plan, + SetOperator::Union, + set_quantifier, + queries, + planner_context, + ), + PipeOperator::Intersect { + set_quantifier, + queries, + } => self.pipe_operator_set( + plan, + SetOperator::Intersect, + set_quantifier, + queries, + planner_context, + ), - // Process each query in the union - for query in queries { - let right_plan = self.query_to_plan(query, planner_context)?; - result_plan = self.set_operation_to_plan( - SetOperator::Union, - result_plan, - right_plan, - set_quantifier, - )?; - } + x => not_impl_err!("`{x}` pipe operator is not supported yet"), + } + } - Ok(result_plan) - } + /// Handle Union/Intersect pipe operators + fn pipe_operator_set( + &self, + plan: LogicalPlan, + set_operator: SetOperator, + set_quantifier: SetQuantifier, + queries: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + let mut result_plan = plan; - x => not_impl_err!("`{x}` pipe operator is not supported yet"), + // Process each query + for query in queries { + let right_plan = self.query_to_plan(query, planner_context)?; + result_plan = self.set_operation_to_plan( + set_operator, + result_plan, + right_plan, + set_quantifier, + )?; } + + Ok(result_plan) } /// Wrap a plan in a limit diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 915fbb54418e..36f2293a2071 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -116,3 +116,12 @@ FROM test 1 2 3 + +# INTERSECT pipe +query I rowsort +SELECT * FROM range(0,3) +|> INTERSECT DISTINCT + (SELECT * FROM range(1,3)); +---- +1 +2 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index ca7c29e4fefd..c39728896fdc 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -632,6 +632,7 @@ DataFusion currently supports the following pipe operators: - [EXTEND](#pipe_extend) - [AS](#pipe_as) - [UNION](#pipe_union) +- [INTERSECT](#pipe_intersect) (pipe_where)= @@ -748,3 +749,19 @@ DataFusion currently supports the following pipe operators: | 5 | +-------+ ``` + +(pipe_intersect)= + +### INTERSECT + +```sql +> select * from range(0,100) +|> INTERSECT DISTINCT ( + select 3 +); ++-------+ +| value | ++-------+ +| 3 | ++-------+ +``` From 223eae41273e93460d7db5aa4e934de29a06e2ec Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 24 Aug 2025 18:24:51 +0200 Subject: [PATCH 12/20] support except --- datafusion/sql/src/query.rs | 12 +++++++++++- .../sqllogictest/test_files/pipe_operator.slt | 11 +++++++++++ docs/source/user-guide/sql/operators.md | 19 +++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 77ea0b90a1ce..0501cc05eb65 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -178,12 +178,22 @@ impl SqlToRel<'_, S> { queries, planner_context, ), + PipeOperator::Except { + set_quantifier, + queries, + } => self.pipe_operator_set( + plan, + SetOperator::Except, + set_quantifier, + queries, + planner_context, + ), x => not_impl_err!("`{x}` pipe operator is not supported yet"), } } - /// Handle Union/Intersect pipe operators + /// Handle Union/Intersect/Except pipe operators fn pipe_operator_set( &self, plan: LogicalPlan, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 36f2293a2071..37b0ff550449 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -125,3 +125,14 @@ SELECT * FROM range(0,3) ---- 1 2 + +# EXCEPT pipe +query I rowsort +select * from range(0,10) +|> EXCEPT DISTINCT (select * from range(5,10)); +---- +0 +1 +2 +3 +4 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index c39728896fdc..73ca530d2d64 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -633,6 +633,7 @@ DataFusion currently supports the following pipe operators: - [AS](#pipe_as) - [UNION](#pipe_union) - [INTERSECT](#pipe_intersect) +- [EXCEPT](#pipe_except) (pipe_where)= @@ -765,3 +766,21 @@ DataFusion currently supports the following pipe operators: | 3 | +-------+ ``` + +(pipe_except)= + +### EXCEPT + +```sql +> select * from range(0,10) +|> EXCEPT DISTINCT (select * from range(5,10)); ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | ++-------+ +``` From 39e1f26e3b7eebf61c67cc7c08d202607639ffb7 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 24 Aug 2025 19:02:11 +0200 Subject: [PATCH 13/20] support aggregate --- datafusion/sql/src/query.rs | 85 ++++++++++++++++++- .../sqllogictest/test_files/pipe_operator.slt | 41 +++++++++ docs/source/user-guide/sql/operators.md | 15 ++++ 3 files changed, 138 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 0501cc05eb65..63e71c82aec5 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -28,9 +28,9 @@ use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, - OrderByKind, PipeOperator, Query, SelectInto, SetExpr, SetOperator, SetQuantifier, - TableAlias, + Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows, + OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr, + SetOperator, SetQuantifier, TableAlias, }; use sqlparser::tokenizer::Span; @@ -188,6 +188,15 @@ impl SqlToRel<'_, S> { queries, planner_context, ), + PipeOperator::Aggregate { + full_table_exprs, + group_by_expr, + } => self.pipe_operator_aggregate( + plan, + full_table_exprs, + group_by_expr, + planner_context, + ), x => not_impl_err!("`{x}` pipe operator is not supported yet"), } @@ -294,6 +303,76 @@ impl SqlToRel<'_, S> { } } + /// Handle AGGREGATE pipe operator + fn pipe_operator_aggregate( + &self, + plan: LogicalPlan, + full_table_exprs: Vec, + group_by_expr: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + // Convert aggregate expressions directly + let aggr_exprs: Vec = full_table_exprs + .into_iter() + .map(|expr_with_alias_and_order_by| { + let expr_with_alias = expr_with_alias_and_order_by.expr; + let sql_expr = expr_with_alias.expr; + let alias = expr_with_alias.alias; + + // Convert SQL expression to DataFusion expression + let df_expr = + self.sql_to_expr(sql_expr, plan.schema(), planner_context)?; + + // Apply alias if present, but handle the case where the expression might already be aliased + match alias { + Some(alias_ident) => { + // If the expression is already an alias, replace the alias name + match df_expr { + Expr::Alias(alias_expr) => { + Ok(alias_expr.expr.alias(alias_ident.value)) + } + _ => Ok(df_expr.alias(alias_ident.value)), + } + } + None => Ok(df_expr), + } + }) + .collect::>>()?; + + // Convert group by expressions directly + let group_by_exprs: Vec = group_by_expr + .into_iter() + .map(|expr_with_alias_and_order_by| { + let expr_with_alias = expr_with_alias_and_order_by.expr; + let sql_expr = expr_with_alias.expr; + let alias = expr_with_alias.alias; + + // Convert SQL expression to DataFusion expression + let df_expr = + self.sql_to_expr(sql_expr, plan.schema(), planner_context)?; + + // Apply alias if present (though group by aliases are less common) + match alias { + Some(alias_ident) => { + // If the expression is already an alias, replace the alias name + match df_expr { + Expr::Alias(alias_expr) => { + Ok(alias_expr.expr.alias(alias_ident.value)) + } + _ => Ok(df_expr.alias(alias_ident.value)), + } + } + None => Ok(df_expr), + } + }) + .collect::>>()?; + + // Create the aggregate logical plan + LogicalPlanBuilder::from(plan) + .aggregate(group_by_exprs, aggr_exprs)? + .build() + } + /// Wrap the logical plan in a `SelectInto` fn select_into( &self, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 37b0ff550449..57d1fc064201 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -136,3 +136,44 @@ select * from range(0,10) 2 3 4 + +# AGGREGATE pipe +query II +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales; +---- +3 14 + +query TII rowsort +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales + GROUP BY item; +---- +apples 2 9 +bananas 1 5 + +query TII rowsort +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales + GROUP BY item +|> WHERE num_items > 1; +---- +apples 2 9 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index 73ca530d2d64..be21af17c720 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -634,6 +634,7 @@ DataFusion currently supports the following pipe operators: - [UNION](#pipe_union) - [INTERSECT](#pipe_intersect) - [EXCEPT](#pipe_except) +- [AGGREGATE](#pipe_aggregate) (pipe_where)= @@ -784,3 +785,17 @@ DataFusion currently supports the following pipe operators: | 4 | +-------+ ``` + +(pipe_aggregate)= + +### AGGREGATE + +```sql +> select * from range(0,3) +|> aggregate sum(value) AS total; ++-------+ +| total | ++-------+ +| 3 | ++-------+ +``` From adb7258b3f7e2121975628be5dc92e5988767a19 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Tue, 26 Aug 2025 17:59:35 +0200 Subject: [PATCH 14/20] support join operator --- datafusion/sql/src/query.rs | 3 +++ datafusion/sql/src/relation/join.rs | 2 +- .../sqllogictest/test_files/pipe_operator.slt | 18 +++++++++++++ docs/source/user-guide/sql/operators.md | 26 +++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 63e71c82aec5..09d531b89286 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -197,6 +197,9 @@ impl SqlToRel<'_, S> { group_by_expr, planner_context, ), + PipeOperator::Join(join) => { + self.parse_relation_join(plan, join, planner_context) + } x => not_impl_err!("`{x}` pipe operator is not supported yet"), } diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs index 10491963e3ce..f8603e29bdcf 100644 --- a/datafusion/sql/src/relation/join.rs +++ b/datafusion/sql/src/relation/join.rs @@ -43,7 +43,7 @@ impl SqlToRel<'_, S> { Ok(left) } - fn parse_relation_join( + pub(crate) fn parse_relation_join( &self, left: LogicalPlan, join: Join, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 57d1fc064201..5908b3d6b2a4 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -177,3 +177,21 @@ query TII rowsort |> WHERE num_items > 1; ---- apples 2 9 + +# JOIN pipe +query TII +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales +) +|> AS produce_sales +|> LEFT JOIN + ( + SELECT "apples" AS item, 123 AS id + ) AS produce_data + ON produce_sales.item = produce_data.item +|> SELECT produce_sales.item, sales, id; +---- +apples 2 123 +bananas 5 NULL diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index be21af17c720..d24cb33c19a0 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -631,6 +631,7 @@ DataFusion currently supports the following pipe operators: - [SELECT](#pipe_select) - [EXTEND](#pipe_extend) - [AS](#pipe_as) +- [JOIN](#pipe_join) - [UNION](#pipe_union) - [INTERSECT](#pipe_intersect) - [EXCEPT](#pipe_except) @@ -731,6 +732,31 @@ DataFusion currently supports the following pipe operators: +-------+ ``` +(pipe_join)= + +### JOIN + +```sql +> ( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales +) +|> AS produce_sales +|> LEFT JOIN + ( + SELECT 'apples' AS item, 123 AS id + ) AS produce_data + ON produce_sales.item = produce_data.item +|> SELECT produce_sales.item, sales, id; ++--------+-------+------+ +| item | sales | id | ++--------+-------+------+ +| apples | 2 | 123 | +| bananas| 5 | NULL | ++--------+-------+------+ +``` + (pipe_union)= ### UNION From b5840745de4a7419360aeb28bfd4288f26c2c757 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Tue, 26 Aug 2025 18:27:17 +0200 Subject: [PATCH 15/20] support pivot --- datafusion/sql/src/query.rs | 157 +++++++++++++++++- .../sqllogictest/test_files/pipe_operator.slt | 99 +++++++++++ docs/source/user-guide/sql/operators.md | 28 ++++ 3 files changed, 280 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 09d531b89286..761167ae8f94 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -20,17 +20,18 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; -use datafusion_expr::expr::{Sort, WildcardOptions}; +use datafusion_expr::expr::{AggregateFunction, Sort, WildcardOptions}; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ - CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, + col, CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows, - OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr, - SetOperator, SetQuantifier, TableAlias, + OrderBy, OrderByExpr, OrderByKind, PipeOperator, PivotValueSource, Query, SelectInto, + SetExpr, SetOperator, SetQuantifier, TableAlias, }; use sqlparser::tokenizer::Span; @@ -200,6 +201,19 @@ impl SqlToRel<'_, S> { PipeOperator::Join(join) => { self.parse_relation_join(plan, join, planner_context) } + PipeOperator::Pivot { + aggregate_functions, + value_column, + value_source, + alias, + } => self.pipe_operator_pivot( + plan, + aggregate_functions, + value_column, + value_source, + alias, + planner_context, + ), x => not_impl_err!("`{x}` pipe operator is not supported yet"), } @@ -376,6 +390,141 @@ impl SqlToRel<'_, S> { .build() } + /// Handle PIVOT pipe operator + fn pipe_operator_pivot( + &self, + plan: LogicalPlan, + aggregate_functions: Vec, + value_column: Vec, + value_source: PivotValueSource, + alias: Option, + planner_context: &mut PlannerContext, + ) -> Result { + // Extract pivot values from the value source + let pivot_values = if let PivotValueSource::List(values) = value_source { + values + } else { + return not_impl_err!( + "Only static pivot value lists are supported currently" + ); + }; + + // Convert pivot column to DataFusion expression + if value_column.len() != 1 { + return not_impl_err!("Multi-column pivot is not supported yet"); + } + let pivot_col_name = &value_column[0].value; + let pivot_col_expr = col(pivot_col_name); + + let input_schema = plan.schema(); + + // Convert sql to DF exprs + let aggregate_functions = aggregate_functions + .into_iter() + .map(|f| self.sql_to_expr_with_alias(f, input_schema, planner_context)) + .collect::, _>>()?; + + // Convert aggregate functions to logical expressions to extract measure columns + let mut measure_columns = std::collections::HashSet::new(); + for agg_func_with_alias in &aggregate_functions { + agg_func_with_alias.apply(|e| { + if let Expr::Column(col) = e { + measure_columns.insert(col.name.clone()); + }; + Ok(TreeNodeRecursion::Continue) + })?; + } + + // Get all column names from the input plan to determine group-by columns. + // Add all columns except the pivot column and measure columns to group by + let mut group_by_cols = Vec::new(); + for field in input_schema.fields() { + let col_name = field.name(); + if col_name != pivot_col_name && !measure_columns.contains(col_name) { + group_by_cols.push(col(col_name)); + } + } + + // Create aggregate expressions for each pivot value + let mut aggr_exprs = Vec::new(); + + // For each pivot value and aggregate function combination, create a conditional aggregate + // Process pivot values first to get the desired column order + for pivot_value in pivot_values { + let pivot_value_expr = self.sql_to_expr( + pivot_value.expr.clone(), + input_schema, + planner_context, + )?; + for agg_func_with_alias in &aggregate_functions { + let (alias_name, mut agg_fn) = match agg_func_with_alias { + Expr::Alias(alias) => match *alias.expr.clone() { + Expr::Alias(inner_alias) => { + let Expr::AggregateFunction( + agg_func @ AggregateFunction { .. }, + ) = *inner_alias.expr.clone() + else { + return not_impl_err!("Only function expressions are supported in PIVOT aggregate functions"); + }; + (Some(alias.name.clone()), agg_func) + } + Expr::AggregateFunction(agg_func @ AggregateFunction { .. }) => { + (Some(alias.name.clone()), agg_func) + } + _ => { + return not_impl_err!("Only function expressions are supported in PIVOT aggregate functions"); + } + }, + Expr::AggregateFunction(agg_func) => (None, agg_func.clone()), + _ => { + return not_impl_err!("Expected aggregate function"); + } + }; + + let new_filter = pivot_col_expr.clone().eq(pivot_value_expr.clone()); + if let Some(existing_filter) = agg_fn.params.filter { + agg_fn.params.filter = + Some(Box::new(existing_filter.and(new_filter))); + } else { + agg_fn.params.filter = Some(Box::new(new_filter)); + } + + let agg_expr = Expr::AggregateFunction(agg_fn); + let aggr_func_alias = alias_name.unwrap_or(agg_expr.name_for_alias()?); + + let pivot_value_name = if let Some(alias) = &pivot_value.alias { + alias.value.clone() + } else { + // Use the pivot value as column name, stripping quotes + pivot_value.expr.to_string().trim_matches('\'').to_string() + }; + + aggr_exprs.push( + // Give unique name based on pivot column name + agg_expr.alias(format!("{aggr_func_alias}_{pivot_value_name}")), + ); + } + } + + // Create the aggregate logical plan + let result_plan = LogicalPlanBuilder::from(plan) + .aggregate(group_by_cols, aggr_exprs)? + .build()?; + + // Apply table alias if provided + if let Some(table_alias) = alias { + self.apply_table_alias( + result_plan, + TableAlias { + name: table_alias, + columns: vec![], + }, + ) + } else { + Ok(result_plan) + } + } + /// Wrap the logical plan in a `SelectInto` fn select_into( &self, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 5908b3d6b2a4..406d8b9b324b 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -195,3 +195,102 @@ query TII ---- apples 2 123 bananas 5 NULL + +# PIVOT pipe + +statement ok +CREATE TABLE pipe_test( + product VARCHAR, + sales INT, + quarter VARCHAR, + year INT +) AS VALUES + ('Kale', 51, 'Q1', 2020), + ('Kale', 23, 'Q2', 2020), + ('Kale', 45, 'Q3', 2020), + ('Kale', 3, 'Q4', 2020), + ('Kale', 70, 'Q1', 2021), + ('Kale', 85, 'Q2', 2021), + ('Apple', 77, 'Q1', 2020), + ('Apple', 0, 'Q2', 2020), + ('Apple', 1, 'Q1', 2021) +; + +query TIIIII rowsort +SELECT * FROM pipe_test +|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2', 'Q3', 'Q4')); +---- +Apple 2020 77 0 NULL NULL +Apple 2021 1 NULL NULL NULL +Kale 2020 51 23 45 3 +Kale 2021 70 85 NULL NULL + +query TIIII rowsort +SELECT * FROM pipe_test +|> select product, sales, quarter +|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2', 'Q3', 'Q4')); +---- +Apple 78 0 NULL NULL +Kale 121 108 45 3 + +query TIII rowsort +SELECT * FROM pipe_test +|> select product, sales, quarter +|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2', 'Q3')); +---- +Apple 78 0 NULL +Kale 121 108 45 + +query TIIII rowsort +SELECT * FROM pipe_test +|> select product, sales, quarter +|> PIVOT(SUM(sales) as total_sales, count(*) as num_records FOR quarter IN ('Q1', 'Q2')); +---- +Apple 78 2 0 1 +Kale 121 2 108 2 + + +query TT +EXPLAIN SELECT * FROM pipe_test +|> select product, sales, quarter +|> PIVOT(SUM(sales) as total_sales, count(*) as num_records FOR quarter IN ('Q1', 'Q2')); +---- +logical_plan +01)Aggregate: groupBy=[[pipe_test.product]], aggr=[[sum(__common_expr_1) FILTER (WHERE __common_expr_2) AS total_sales_Q1, count(Int64(1)) FILTER (WHERE __common_expr_2) AS num_records_Q1, sum(__common_expr_1) FILTER (WHERE __common_expr_3) AS total_sales_Q2, count(Int64(1)) FILTER (WHERE __common_expr_3) AS num_records_Q2]] +02)--Projection: CAST(pipe_test.sales AS Int64) AS __common_expr_1, pipe_test.quarter = Utf8View("Q1") AS __common_expr_2, pipe_test.quarter = Utf8View("Q2") AS __common_expr_3, pipe_test.product +03)----TableScan: pipe_test projection=[product, sales, quarter] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[product@0 as product], aggr=[total_sales_Q1, num_records_Q1, total_sales_Q2, num_records_Q2] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([product@0], 4), input_partitions=4 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[product@3 as product], aggr=[total_sales_Q1, num_records_Q1, total_sales_Q2, num_records_Q2] +06)----------ProjectionExec: expr=[CAST(sales@1 AS Int64) as __common_expr_1, quarter@2 = Q1 as __common_expr_2, quarter@2 = Q2 as __common_expr_3, product@0 as product] +07)------------DataSourceExec: partitions=1, partition_sizes=[1] + +# With explicit pivot value alias +query TT +EXPLAIN SELECT * FROM pipe_test +|> select product, sales, quarter +|> PIVOT(SUM(sales) as total_sales, count(*) as num_records FOR quarter IN ('Q1' as q1, 'Q2')); +---- +logical_plan +01)Aggregate: groupBy=[[pipe_test.product]], aggr=[[sum(__common_expr_1) FILTER (WHERE __common_expr_2) AS total_sales_q1, count(Int64(1)) FILTER (WHERE __common_expr_2) AS num_records_q1, sum(__common_expr_1) FILTER (WHERE __common_expr_3) AS total_sales_Q2, count(Int64(1)) FILTER (WHERE __common_expr_3) AS num_records_Q2]] +02)--Projection: CAST(pipe_test.sales AS Int64) AS __common_expr_1, pipe_test.quarter = Utf8View("Q1") AS __common_expr_2, pipe_test.quarter = Utf8View("Q2") AS __common_expr_3, pipe_test.product +03)----TableScan: pipe_test projection=[product, sales, quarter] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[product@0 as product], aggr=[total_sales_q1, num_records_q1, total_sales_Q2, num_records_Q2] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([product@0], 4), input_partitions=4 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[product@3 as product], aggr=[total_sales_q1, num_records_q1, total_sales_Q2, num_records_Q2] +06)----------ProjectionExec: expr=[CAST(sales@1 AS Int64) as __common_expr_1, quarter@2 = Q1 as __common_expr_2, quarter@2 = Q2 as __common_expr_3, product@0 as product] +07)------------DataSourceExec: partitions=1, partition_sizes=[1] + +# Aggregation functions with multiple parameters +query TTT rowsort +SELECT product, sales, quarter FROM pipe_test +|> PIVOT(string_agg(sales, '_' order by sales) as agg FOR quarter IN ('Q1', 'Q2')); +---- +Apple 1_77 0 +Kale 51_70 23_85 diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index d24cb33c19a0..103b10b1287d 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -636,6 +636,7 @@ DataFusion currently supports the following pipe operators: - [INTERSECT](#pipe_intersect) - [EXCEPT](#pipe_except) - [AGGREGATE](#pipe_aggregate) +- [PIVOT](#pipe_pivot) (pipe_where)= @@ -825,3 +826,30 @@ DataFusion currently supports the following pipe operators: | 3 | +-------+ ``` + +(pipe_pivot)= + +### PIVOT + +Rotates rows into columns. + +```sql +> ( + SELECT 'kale' AS product, 51 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'kale' AS product, 4 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'kale' AS product, 45 AS sales, 'Q2' AS quarter + UNION ALL + SELECT 'apple' AS product, 8 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'apple' AS product, 10 AS sales, 'Q2' AS quarter +) +|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2')); ++---------+-----+-----+ +| product | Q1 | Q2 | ++---------+-----+-----+ +| apple | 8 | 10 | +| kale | 55 | 45 | ++---------+-----+-----+ +``` From 6756c279105ff9ebf65dd35b79c2d8e58d0e8650 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 5 Oct 2025 20:13:37 +0200 Subject: [PATCH 16/20] remove unused --- datafusion/sql/src/query.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index d5a88ac32eb9..1b15899fddd8 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -51,8 +51,6 @@ impl SqlToRel<'_, S> { self.plan_with_clause(with, planner_context)?; } - let pipe_operators = query.pipe_operators.clone(); - let set_expr = *query.body; let plan = match set_expr { SetExpr::Select(mut select) => { From 30aea530bad42c8f231ffe12b407ab4f8d411a84 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 5 Oct 2025 20:15:47 +0200 Subject: [PATCH 17/20] revert parquet-testing --- parquet-testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-testing b/parquet-testing index a3d96a65e11e..107b36603e05 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3 +Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 From 1a1e942a27ec6646cb21abe81dc0b712ea7a1f19 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 5 Oct 2025 20:16:14 +0200 Subject: [PATCH 18/20] remove simon --- simon/data.csv | 3 --- simon/query.sql | 10 ---------- 2 files changed, 13 deletions(-) delete mode 100644 simon/data.csv delete mode 100644 simon/query.sql diff --git a/simon/data.csv b/simon/data.csv deleted file mode 100644 index eba8ed27a30f..000000000000 --- a/simon/data.csv +++ /dev/null @@ -1,3 +0,0 @@ -afdeling,budget,new_budget -a,2,2 -b,1,2 diff --git a/simon/query.sql b/simon/query.sql deleted file mode 100644 index 0f2d0d7f6f3d..000000000000 --- a/simon/query.sql +++ /dev/null @@ -1,10 +0,0 @@ -set datafusion.sql_parser.dialect = 'BigQuery'; - -CREATE EXTERNAL TABLE test -STORED AS CSV -LOCATION '/Users/svs/code/datafusion/simon/data.csv' -OPTIONS ('has_header' 'true'); - -select * from test -|> where afdeling = 'a' -|> extend budget - new_budget as diff From 89582b33800be26281c25a9f7c09f07cc190e052 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 5 Oct 2025 20:19:54 +0200 Subject: [PATCH 19/20] move docs to select.md --- docs/source/user-guide/sql/operators.md | 240 ------------------------ docs/source/user-guide/sql/select.md | 54 ++++++ 2 files changed, 54 insertions(+), 240 deletions(-) diff --git a/docs/source/user-guide/sql/operators.md b/docs/source/user-guide/sql/operators.md index 103b10b1287d..b63f55239621 100644 --- a/docs/source/user-guide/sql/operators.md +++ b/docs/source/user-guide/sql/operators.md @@ -613,243 +613,3 @@ bar") | bar | +-----------------+ ``` - -## Pipe operators - -Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`. -The SQL dialect can be set like this: - -```sql -set datafusion.sql_parser.dialect = 'BigQuery'; -``` - -DataFusion currently supports the following pipe operators: - -- [WHERE](#pipe_where) -- [ORDER BY](#pipe_order_by) -- [LIMIT](#pipe_limit) -- [SELECT](#pipe_select) -- [EXTEND](#pipe_extend) -- [AS](#pipe_as) -- [JOIN](#pipe_join) -- [UNION](#pipe_union) -- [INTERSECT](#pipe_intersect) -- [EXCEPT](#pipe_except) -- [AGGREGATE](#pipe_aggregate) -- [PIVOT](#pipe_pivot) - -(pipe_where)= - -### WHERE - -```sql -> select * from range(0,10) -|> where value < 2; -+-------+ -| value | -+-------+ -| 0 | -| 1 | -+-------+ -``` - -(pipe_order_by)= - -### ORDER BY - -```sql -> select * from range(0,3) -|> order by value desc; -+-------+ -| value | -+-------+ -| 2 | -| 1 | -| 0 | -+-------+ -``` - -(pipe_limit)= - -### LIMIT - -```sql -> select * from range(0,3) -|> order by value desc -|> limit 1; -+-------+ -| value | -+-------+ -| 2 | -+-------+ -``` - -(pipe_select)= - -### SELECT - -```sql -> select * from range(0,3) -|> select value + 10; -+---------------------------+ -| range().value + Int64(10) | -+---------------------------+ -| 10 | -| 11 | -| 12 | -+---------------------------+ -``` - -(pipe_extend)= - -### EXTEND - -```sql -> select * from range(0,3) -|> extend -value AS minus_value; -+-------+-------------+ -| value | minus_value | -+-------+-------------+ -| 0 | 0 | -| 1 | -1 | -| 2 | -2 | -+-------+-------------+ -``` - -(pipe_as)= - -### AS - -```sql -> select * from range(0,3) -|> as my_range -|> SELECT my_range.value; -+-------+ -| value | -+-------+ -| 0 | -| 1 | -| 2 | -+-------+ -``` - -(pipe_join)= - -### JOIN - -```sql -> ( - SELECT 'apples' AS item, 2 AS sales - UNION ALL - SELECT 'bananas' AS item, 5 AS sales -) -|> AS produce_sales -|> LEFT JOIN - ( - SELECT 'apples' AS item, 123 AS id - ) AS produce_data - ON produce_sales.item = produce_data.item -|> SELECT produce_sales.item, sales, id; -+--------+-------+------+ -| item | sales | id | -+--------+-------+------+ -| apples | 2 | 123 | -| bananas| 5 | NULL | -+--------+-------+------+ -``` - -(pipe_union)= - -### UNION - -```sql -> select * from range(0,3) -|> union all ( - select * from range(3,6) -); -+-------+ -| value | -+-------+ -| 0 | -| 1 | -| 2 | -| 3 | -| 4 | -| 5 | -+-------+ -``` - -(pipe_intersect)= - -### INTERSECT - -```sql -> select * from range(0,100) -|> INTERSECT DISTINCT ( - select 3 -); -+-------+ -| value | -+-------+ -| 3 | -+-------+ -``` - -(pipe_except)= - -### EXCEPT - -```sql -> select * from range(0,10) -|> EXCEPT DISTINCT (select * from range(5,10)); -+-------+ -| value | -+-------+ -| 0 | -| 1 | -| 2 | -| 3 | -| 4 | -+-------+ -``` - -(pipe_aggregate)= - -### AGGREGATE - -```sql -> select * from range(0,3) -|> aggregate sum(value) AS total; -+-------+ -| total | -+-------+ -| 3 | -+-------+ -``` - -(pipe_pivot)= - -### PIVOT - -Rotates rows into columns. - -```sql -> ( - SELECT 'kale' AS product, 51 AS sales, 'Q1' AS quarter - UNION ALL - SELECT 'kale' AS product, 4 AS sales, 'Q1' AS quarter - UNION ALL - SELECT 'kale' AS product, 45 AS sales, 'Q2' AS quarter - UNION ALL - SELECT 'apple' AS product, 8 AS sales, 'Q1' AS quarter - UNION ALL - SELECT 'apple' AS product, 10 AS sales, 'Q2' AS quarter -) -|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2')); -+---------+-----+-----+ -| product | Q1 | Q2 | -+---------+-----+-----+ -| apple | 8 | 10 | -| kale | 55 | 45 | -+---------+-----+-----+ -``` diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md index 8c1bc401d3aa..e854d67cc1f6 100644 --- a/docs/source/user-guide/sql/select.md +++ b/docs/source/user-guide/sql/select.md @@ -350,6 +350,8 @@ DataFusion currently supports the following pipe operators: - [INTERSECT](#pipe_intersect) - [EXCEPT](#pipe_except) - [AGGREGATE](#pipe_aggregate) +- [PIVOT](#pipe_pivot) +- [JOIN](#pipe_join) (pipe_where)= @@ -514,3 +516,55 @@ select * from range(0,3) | 3 | +-------+ ``` + +(pipe_pivot)= + +### PIVOT + +Rotates rows into columns. + +```sql +> ( + SELECT 'kale' AS product, 51 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'kale' AS product, 4 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'kale' AS product, 45 AS sales, 'Q2' AS quarter + UNION ALL + SELECT 'apple' AS product, 8 AS sales, 'Q1' AS quarter + UNION ALL + SELECT 'apple' AS product, 10 AS sales, 'Q2' AS quarter +) +|> PIVOT(SUM(sales) FOR quarter IN ('Q1', 'Q2')); ++---------+-----+-----+ +| product | Q1 | Q2 | ++---------+-----+-----+ +| apple | 8 | 10 | +| kale | 55 | 45 | ++---------+-----+-----+ +``` + +(pipe_join)= + +### JOIN + +```sql +> ( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales +) +|> AS produce_sales +|> LEFT JOIN + ( + SELECT 'apples' AS item, 123 AS id + ) AS produce_data + ON produce_sales.item = produce_data.item +|> SELECT produce_sales.item, sales, id; ++--------+-------+------+ +| item | sales | id | ++--------+-------+------+ +| apples | 2 | 123 | +| bananas| 5 | NULL | ++--------+-------+------+ +``` From 6d5085ff4fd09fc2b7bb96e72c998edbee2c9fed Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Sun, 5 Oct 2025 20:22:44 +0200 Subject: [PATCH 20/20] remove unnecessary comments --- datafusion/sql/src/query.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1b15899fddd8..c725b49bc303 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -363,7 +363,6 @@ impl SqlToRel<'_, S> { alias: Option, planner_context: &mut PlannerContext, ) -> Result { - // Extract pivot values from the value source let pivot_values = if let PivotValueSource::List(values) = value_source { values } else { @@ -372,7 +371,6 @@ impl SqlToRel<'_, S> { ); }; - // Convert pivot column to DataFusion expression if value_column.len() != 1 { return not_impl_err!("Multi-column pivot is not supported yet"); } @@ -408,7 +406,6 @@ impl SqlToRel<'_, S> { } } - // Create aggregate expressions for each pivot value let mut aggr_exprs = Vec::new(); // For each pivot value and aggregate function combination, create a conditional aggregate @@ -469,7 +466,6 @@ impl SqlToRel<'_, S> { } } - // Create the aggregate logical plan let result_plan = LogicalPlanBuilder::from(plan) .aggregate(group_by_cols, aggr_exprs)? .build()?;