diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index c06147e08f87..9f8b483b8fef 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -21,14 +21,15 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; -use datafusion_expr::expr::Sort; +use datafusion_expr::expr::{Sort, WildcardOptions}; +use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query, - SelectInto, SetExpr, + Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr, + OrderByKind, PipeOperator, Query, SelectInto, SetExpr, }; use sqlparser::tokenizer::Span; @@ -49,7 +50,7 @@ impl SqlToRel<'_, S> { } let set_expr = *query.body; - match set_expr { + let plan = match set_expr { SetExpr::Select(mut select) => { let select_into = select.into.take(); let plan = @@ -78,6 +79,75 @@ impl SqlToRel<'_, S> { let plan = self.order_by(plan, order_by_rex)?; self.limit(plan, query.limit_clause, planner_context) } + }?; + + self.pipe_operators(plan, query.pipe_operators, planner_context) + } + + /// Apply pipe operators to a plan + fn pipe_operators( + &self, + mut plan: LogicalPlan, + pipe_operators: Vec, + planner_context: &mut PlannerContext, + ) -> Result { + for pipe_operator in pipe_operators { + plan = self.pipe_operator(plan, pipe_operator, planner_context)?; + } + Ok(plan) + } + + /// Apply a pipe operator to a plan + fn pipe_operator( + &self, + plan: LogicalPlan, + pipe_operator: PipeOperator, + planner_context: &mut PlannerContext, + ) -> Result { + match pipe_operator { + PipeOperator::Where { expr } => { + self.plan_selection(Some(expr), plan, planner_context) + } + PipeOperator::OrderBy { exprs } => { + let sort_exprs = self.order_by_to_sort_expr( + exprs, + plan.schema(), + planner_context, + true, + None, + )?; + self.order_by(plan, sort_exprs) + } + PipeOperator::Limit { expr, offset } => self.limit( + plan, + Some(LimitClause::LimitOffset { + limit: Some(expr), + offset: offset.map(|offset| Offset { + value: offset, + rows: OffsetRows::None, + }), + limit_by: vec![], + }), + planner_context, + ), + PipeOperator::Select { exprs } => { + let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); + let select_exprs = + self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; + self.project(plan, select_exprs) + } + PipeOperator::Extend { exprs } => { + let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); + let extend_exprs = + self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?; + let all_exprs = + std::iter::once(SelectExpr::Wildcard(WildcardOptions::default())) + .chain(extend_exprs) + .collect(); + self.project(plan, all_exprs) + } + + x => not_impl_err!("`{x}` pipe operator is not supported yet"), } } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 26dbf45fbce1..42013a76a865 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -585,7 +585,7 @@ impl SqlToRel<'_, S> { Ok((intermediate_plan, intermediate_select_exprs)) } - fn plan_selection( + pub(crate) fn plan_selection( &self, selection: Option, plan: LogicalPlan, @@ -666,7 +666,7 @@ impl SqlToRel<'_, S> { } /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions. - fn prepare_select_exprs( + pub(crate) fn prepare_select_exprs( &self, plan: &LogicalPlan, projection: Vec, @@ -826,7 +826,11 @@ impl SqlToRel<'_, S> { } /// Wrap a plan in a projection - fn project(&self, input: LogicalPlan, expr: Vec) -> Result { + pub(crate) fn project( + &self, + input: LogicalPlan, + expr: Vec, + ) -> Result { // convert to Expr for validate_schema_satisfies_exprs let exprs = expr .iter() diff --git a/datafusion/sqllogictest/test_files/pipe_operator.slt b/datafusion/sqllogictest/test_files/pipe_operator.slt new file mode 100644 index 000000000000..6b92df943138 --- /dev/null +++ b/datafusion/sqllogictest/test_files/pipe_operator.slt @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# BigQuery supports the pipe operator syntax +# TODO: Make the Generic dialect support the pipe operator syntax +statement ok +set datafusion.sql_parser.dialect = 'BigQuery'; + +statement ok +CREATE TABLE test( + a INT, + b FLOAT, + c VARCHAR, + n VARCHAR +) AS VALUES + (1, 1.1, 'a', NULL), + (2, 2.2, 'b', NULL), + (3, 3.3, 'c', NULL) +; + +# WHERE pipe +query IRTT +SELECT * +FROM test +|> WHERE a > 1 +---- +2 2.2 b NULL +3 3.3 c NULL + +# ORDER BY pipe +query IRTT +SELECT * +FROM test +|> ORDER BY a DESC +---- +3 3.3 c NULL +2 2.2 b NULL +1 1.1 a NULL + +# ORDER BY pipe, limit +query IRTT +SELECT * +FROM test +|> ORDER BY a DESC +|> LIMIT 1 +---- +3 3.3 c NULL + +# SELECT pipe +query I +SELECT * +FROM test +|> SELECT a +---- +1 +2 +3 + +# EXTEND pipe +query IRR +SELECT * +FROM test +|> SELECT a, b +|> EXTEND a + b AS a_plus_b +---- +1 1.1 2.1 +2 2.2 4.2 +3 3.3 6.3 + +query IRR +SELECT * +FROM test +|> SELECT a, b +|> where a = 1 +|> EXTEND a + b AS a_plus_b +---- +1 1.1 2.1 diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md index 39163cf492a4..87e940245b25 100644 --- a/docs/source/user-guide/sql/select.md +++ b/docs/source/user-guide/sql/select.md @@ -40,6 +40,7 @@ DataFusion supports the following syntax for queries: [ [ORDER BY](#order-by-clause) expression [ ASC | DESC ][, ...] ]
[ [LIMIT](#limit-clause) count ]
[ [EXCLUDE | EXCEPT](#exclude-and-except-clause) ]
+[Pipe operators](#pipe-operators)
@@ -327,3 +328,98 @@ FROM table; SELECT * EXCLUDE(age, person) FROM table; ``` + +## Pipe operators + +Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`. +The SQL dialect can be set like this: + +```sql +set datafusion.sql_parser.dialect = 'BigQuery'; +``` + +DataFusion currently supports the following pipe operators: + +- [WHERE](#pipe_where) +- [ORDER BY](#pipe_order_by) +- [LIMIT](#pipe_limit) +- [SELECT](#pipe_select) +- [EXTEND](#pipe_extend) + +(pipe_where)= + +### WHERE + +```sql +select * from range(0,10) +|> where value < 2; ++-------+ +| value | ++-------+ +| 0 | +| 1 | ++-------+ +``` + +(pipe_order_by)= + +### ORDER BY + +```sql +select * from range(0,3) +|> order by value desc; ++-------+ +| value | ++-------+ +| 2 | +| 1 | +| 0 | ++-------+ +``` + +(pipe_limit)= + +### LIMIT + +```sql +select * from range(0,3) +|> order by value desc +|> limit 1; ++-------+ +| value | ++-------+ +| 2 | ++-------+ +``` + +(pipe_select)= + +### SELECT + +```sql +select * from range(0,3) +|> select value + 10; ++---------------------------+ +| range().value + Int64(10) | ++---------------------------+ +| 10 | +| 11 | +| 12 | ++---------------------------+ +``` + +(pipe_extend)= + +### EXTEND + +```sql +select * from range(0,3) +|> extend -value AS minus_value; ++-------+-------------+ +| value | minus_value | ++-------+-------------+ +| 0 | 0 | +| 1 | -1 | +| 2 | -2 | ++-------+-------------+ +```