diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 9f8b483b8fef..633d933eb845 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -28,8 +28,9 @@ use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, - OrderByKind, PipeOperator, Query, SelectInto, SetExpr, + Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows, + OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr, + SetOperator, SetQuantifier, TableAlias, }; use sqlparser::tokenizer::Span; @@ -146,11 +147,80 @@ impl SqlToRel<'_, S> { .collect(); self.project(plan, all_exprs) } + PipeOperator::As { alias } => self.apply_table_alias( + plan, + TableAlias { + name: alias, + // Apply to all fields + columns: vec![], + }, + ), + PipeOperator::Union { + set_quantifier, + queries, + } => self.pipe_operator_set( + plan, + SetOperator::Union, + set_quantifier, + queries, + planner_context, + ), + PipeOperator::Intersect { + set_quantifier, + queries, + } => self.pipe_operator_set( + plan, + SetOperator::Intersect, + set_quantifier, + queries, + planner_context, + ), + PipeOperator::Except { + set_quantifier, + queries, + } => self.pipe_operator_set( + plan, + SetOperator::Except, + set_quantifier, + queries, + planner_context, + ), + PipeOperator::Aggregate { + full_table_exprs, + group_by_expr, + } => self.pipe_operator_aggregate( + plan, + full_table_exprs, + group_by_expr, + planner_context, + ), x => not_impl_err!("`{x}` pipe operator is not supported yet"), } } + /// Handle Union/Intersect/Except pipe operators + fn pipe_operator_set( + &self, + mut plan: LogicalPlan, + set_operator: SetOperator, + set_quantifier: SetQuantifier, + queries: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + for query in queries { + let right_plan = self.query_to_plan(query, planner_context)?; + plan = self.set_operation_to_plan( + set_operator, + plan, + right_plan, + set_quantifier, + )?; + } + + Ok(plan) + } + /// Wrap a plan in a limit fn limit( &self, @@ -227,6 +297,45 @@ impl SqlToRel<'_, S> { } } + /// Handle AGGREGATE pipe operator + fn pipe_operator_aggregate( + &self, + plan: LogicalPlan, + full_table_exprs: Vec, + group_by_expr: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + let plan_schema = plan.schema(); + let process_expr = + |expr_with_alias_and_order_by: ExprWithAliasAndOrderBy, + planner_context: &mut PlannerContext| { + let expr_with_alias = expr_with_alias_and_order_by.expr; + let sql_expr = expr_with_alias.expr; + let alias = expr_with_alias.alias; + + let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?; + + match alias { + Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value), + None => Ok(df_expr), + } + }; + + let aggr_exprs: Vec = full_table_exprs + .into_iter() + .map(|e| process_expr(e, planner_context)) + .collect::>>()?; + + let group_by_exprs: Vec = group_by_expr + .into_iter() + .map(|e| process_expr(e, planner_context)) + .collect::>>()?; + + LogicalPlanBuilder::from(plan) + .aggregate(group_by_exprs, aggr_exprs)? + .build() + } + /// Wrap the logical plan in a `SelectInto` fn select_into( &self, diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt index 6b92df943138..57d1fc064201 100644 --- a/datafusion/sqllogictest/test_files/pipe_operator.slt +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -89,3 +89,91 @@ FROM test |> EXTEND a + b AS a_plus_b ---- 1 1.1 2.1 + +# AS pipe +query I +SELECT * +FROM test +|> as test_pipe +|> select test_pipe.a +---- +1 +2 +3 + +# UNION pipe +query I +SELECT * +FROM test +|> select a +|> UNION ALL ( + SELECT a FROM test +); +---- +1 +2 +3 +1 +2 +3 + +# INTERSECT pipe +query I rowsort +SELECT * FROM range(0,3) +|> INTERSECT DISTINCT + (SELECT * FROM range(1,3)); +---- +1 +2 + +# EXCEPT pipe +query I rowsort +select * from range(0,10) +|> EXCEPT DISTINCT (select * from range(5,10)); +---- +0 +1 +2 +3 +4 + +# AGGREGATE pipe +query II +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales; +---- +3 14 + +query TII rowsort +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales + GROUP BY item; +---- +apples 2 9 +bananas 1 5 + +query TII rowsort +( + SELECT 'apples' AS item, 2 AS sales + UNION ALL + SELECT 'bananas' AS item, 5 AS sales + UNION ALL + SELECT 'apples' AS item, 7 AS sales +) +|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales + GROUP BY item +|> WHERE num_items > 1; +---- +apples 2 9 diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md index 87e940245b25..8c1bc401d3aa 100644 --- a/docs/source/user-guide/sql/select.md +++ b/docs/source/user-guide/sql/select.md @@ -345,6 +345,11 @@ DataFusion currently supports the following pipe operators: - [LIMIT](#pipe_limit) - [SELECT](#pipe_select) - [EXTEND](#pipe_extend) +- [AS](#pipe_as) +- [UNION](#pipe_union) +- [INTERSECT](#pipe_intersect) +- [EXCEPT](#pipe_except) +- [AGGREGATE](#pipe_aggregate) (pipe_where)= @@ -423,3 +428,89 @@ select * from range(0,3) | 2 | -2 | +-------+-------------+ ``` + +(pipe_as)= + +### AS + +```sql +select * from range(0,3) +|> as my_range +|> SELECT my_range.value; ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | ++-------+ +``` + +(pipe_union)= + +### UNION + +```sql +select * from range(0,3) +|> union all ( + select * from range(3,6) +); ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | +| 5 | ++-------+ +``` + +(pipe_intersect)= + +### INTERSECT + +```sql +select * from range(0,100) +|> INTERSECT DISTINCT ( + select 3 +); ++-------+ +| value | ++-------+ +| 3 | ++-------+ +``` + +(pipe_except)= + +### EXCEPT + +```sql +select * from range(0,10) +|> EXCEPT DISTINCT (select * from range(5,10)); ++-------+ +| value | ++-------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | ++-------+ +``` + +(pipe_aggregate)= + +### AGGREGATE + +```sql +select * from range(0,3) +|> aggregate sum(value) AS total; ++-------+ +| total | ++-------+ +| 3 | ++-------+ +```