Skip to content
Merged
78 changes: 74 additions & 4 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

use crate::stack::StackGuard;
use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::expr::{Sort, WildcardOptions};

use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query,
SelectInto, SetExpr,
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
};
use sqlparser::tokenizer::Span;

Expand All @@ -49,7 +50,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

let set_expr = *query.body;
match set_expr {
let plan = match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
let plan =
Expand Down Expand Up @@ -78,6 +79,75 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.limit_clause, planner_context)
}
}?;

self.pipe_operators(plan, query.pipe_operators, planner_context)
}

/// Apply pipe operators to a plan
fn pipe_operators(
&self,
mut plan: LogicalPlan,
pipe_operators: Vec<PipeOperator>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
for pipe_operator in pipe_operators {
plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
}
Ok(plan)
}

/// Apply a pipe operator to a plan
fn pipe_operator(
&self,
plan: LogicalPlan,
pipe_operator: PipeOperator,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match pipe_operator {
PipeOperator::Where { expr } => {
self.plan_selection(Some(expr), plan, planner_context)
}
PipeOperator::OrderBy { exprs } => {
let sort_exprs = self.order_by_to_sort_expr(
exprs,
plan.schema(),
planner_context,
true,
None,
)?;
self.order_by(plan, sort_exprs)
}
PipeOperator::Limit { expr, offset } => self.limit(
plan,
Some(LimitClause::LimitOffset {
limit: Some(expr),
offset: offset.map(|offset| Offset {
value: offset,
rows: OffsetRows::None,
}),
limit_by: vec![],
}),
planner_context,
),
PipeOperator::Select { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let select_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
self.project(plan, select_exprs)
}
PipeOperator::Extend { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let extend_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
let all_exprs =
std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
.chain(extend_exprs)
.collect();
self.project(plan, all_exprs)
}

x => not_impl_err!("`{x}` pipe operator is not supported yet"),
}
}

Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok((intermediate_plan, intermediate_select_exprs))
}

fn plan_selection(
pub(crate) fn plan_selection(
&self,
selection: Option<SQLExpr>,
plan: LogicalPlan,
Expand Down Expand Up @@ -666,7 +666,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
fn prepare_select_exprs(
pub(crate) fn prepare_select_exprs(
&self,
plan: &LogicalPlan,
projection: Vec<SelectItem>,
Expand Down Expand Up @@ -826,7 +826,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

/// Wrap a plan in a projection
fn project(&self, input: LogicalPlan, expr: Vec<SelectExpr>) -> Result<LogicalPlan> {
pub(crate) fn project(
&self,
input: LogicalPlan,
expr: Vec<SelectExpr>,
) -> Result<LogicalPlan> {
// convert to Expr for validate_schema_satisfies_exprs
let exprs = expr
.iter()
Expand Down
91 changes: 91 additions & 0 deletions datafusion/sqllogictest/test_files/pipe_operator.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# BigQuery supports the pipe operator syntax
# TODO: Make the Generic dialect support the pipe operator syntax
statement ok
set datafusion.sql_parser.dialect = 'BigQuery';

statement ok
CREATE TABLE test(
a INT,
b FLOAT,
c VARCHAR,
n VARCHAR
) AS VALUES
(1, 1.1, 'a', NULL),
(2, 2.2, 'b', NULL),
(3, 3.3, 'c', NULL)
;

# WHERE pipe
query IRTT
SELECT *
FROM test
|> WHERE a > 1
----
2 2.2 b NULL
3 3.3 c NULL

# ORDER BY pipe
query IRTT
SELECT *
FROM test
|> ORDER BY a DESC
----
3 3.3 c NULL
2 2.2 b NULL
1 1.1 a NULL

# ORDER BY pipe, limit
query IRTT
SELECT *
FROM test
|> ORDER BY a DESC
|> LIMIT 1
----
3 3.3 c NULL

# SELECT pipe
query I
SELECT *
FROM test
|> SELECT a
----
1
2
3

# EXTEND pipe
query IRR
SELECT *
FROM test
|> SELECT a, b
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1
2 2.2 4.2
3 3.3 6.3

query IRR
SELECT *
FROM test
|> SELECT a, b
|> where a = 1
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1
96 changes: 96 additions & 0 deletions docs/source/user-guide/sql/select.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DataFusion supports the following syntax for queries:
[ [ORDER BY](#order-by-clause) expression [ ASC | DESC ][, ...] ] <br/>
[ [LIMIT](#limit-clause) count ] <br/>
[ [EXCLUDE | EXCEPT](#exclude-and-except-clause) ] <br/>
[Pipe operators](#pipe-operators) <br/>

</code>

Expand Down Expand Up @@ -327,3 +328,98 @@ FROM table;
SELECT * EXCLUDE(age, person)
FROM table;
```

## Pipe operators

Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`.
The SQL dialect can be set like this:

```sql
set datafusion.sql_parser.dialect = 'BigQuery';
```

DataFusion currently supports the following pipe operators:

- [WHERE](#pipe_where)
- [ORDER BY](#pipe_order_by)
- [LIMIT](#pipe_limit)
- [SELECT](#pipe_select)
- [EXTEND](#pipe_extend)

(pipe_where)=

### WHERE

```sql
select * from range(0,10)
|> where value < 2;
+-------+
| value |
+-------+
| 0 |
| 1 |
+-------+
```

(pipe_order_by)=

### ORDER BY

```sql
select * from range(0,3)
|> order by value desc;
+-------+
| value |
+-------+
| 2 |
| 1 |
| 0 |
+-------+
```

(pipe_limit)=

### LIMIT

```sql
select * from range(0,3)
|> order by value desc
|> limit 1;
+-------+
| value |
+-------+
| 2 |
+-------+
```

(pipe_select)=

### SELECT

```sql
select * from range(0,3)
|> select value + 10;
+---------------------------+
| range().value + Int64(10) |
+---------------------------+
| 10 |
| 11 |
| 12 |
+---------------------------+
```

(pipe_extend)=

### EXTEND

```sql
select * from range(0,3)
|> extend -value AS minus_value;
+-------+-------------+
| value | minus_value |
+-------+-------------+
| 0 | 0 |
| 1 | -1 |
| 2 | -2 |
+-------+-------------+
```
Loading