Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 111 additions & 2 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
SetOperator, SetQuantifier, TableAlias,
};
use sqlparser::tokenizer::Span;

Expand Down Expand Up @@ -146,11 +147,80 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.collect();
self.project(plan, all_exprs)
}
PipeOperator::As { alias } => self.apply_table_alias(
plan,
TableAlias {
name: alias,
// Apply to all fields
columns: vec![],
},
),
PipeOperator::Union {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Union,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Intersect {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Intersect,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Except {
set_quantifier,
queries,
} => self.pipe_operator_set(
plan,
SetOperator::Except,
set_quantifier,
queries,
planner_context,
),
PipeOperator::Aggregate {
full_table_exprs,
group_by_expr,
} => self.pipe_operator_aggregate(
plan,
full_table_exprs,
group_by_expr,
planner_context,
),

x => not_impl_err!("`{x}` pipe operator is not supported yet"),
}
}

/// Handle Union/Intersect/Except pipe operators
fn pipe_operator_set(
&self,
mut plan: LogicalPlan,
set_operator: SetOperator,
set_quantifier: SetQuantifier,
queries: Vec<Query>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
for query in queries {
let right_plan = self.query_to_plan(query, planner_context)?;
plan = self.set_operation_to_plan(
set_operator,
plan,
right_plan,
set_quantifier,
)?;
}

Ok(plan)
}

/// Wrap a plan in a limit
fn limit(
&self,
Expand Down Expand Up @@ -227,6 +297,45 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}

/// Handle AGGREGATE pipe operator
fn pipe_operator_aggregate(
&self,
plan: LogicalPlan,
full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
group_by_expr: Vec<ExprWithAliasAndOrderBy>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let plan_schema = plan.schema();
let process_expr =
|expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
planner_context: &mut PlannerContext| {
let expr_with_alias = expr_with_alias_and_order_by.expr;
let sql_expr = expr_with_alias.expr;
let alias = expr_with_alias.alias;

let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?;

match alias {
Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value),
None => Ok(df_expr),
}
};

let aggr_exprs: Vec<Expr> = full_table_exprs
.into_iter()
.map(|e| process_expr(e, planner_context))
.collect::<Result<Vec<_>>>()?;

let group_by_exprs: Vec<Expr> = group_by_expr
.into_iter()
.map(|e| process_expr(e, planner_context))
.collect::<Result<Vec<_>>>()?;

LogicalPlanBuilder::from(plan)
.aggregate(group_by_exprs, aggr_exprs)?
.build()
}

/// Wrap the logical plan in a `SelectInto`
fn select_into(
&self,
Expand Down
88 changes: 88 additions & 0 deletions datafusion/sqllogictest/test_files/pipe_operator.slt
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,91 @@ FROM test
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1

# AS pipe
query I
SELECT *
FROM test
|> as test_pipe
|> select test_pipe.a
----
1
2
3

# UNION pipe
query I
SELECT *
FROM test
|> select a
|> UNION ALL (
SELECT a FROM test
);
----
1
2
3
1
2
3

# INTERSECT pipe
query I rowsort
SELECT * FROM range(0,3)
|> INTERSECT DISTINCT
(SELECT * FROM range(1,3));
----
1
2

# EXCEPT pipe
query I rowsort
select * from range(0,10)
|> EXCEPT DISTINCT (select * from range(5,10));
----
0
1
2
3
4

# AGGREGATE pipe
query II
(
SELECT 'apples' AS item, 2 AS sales
UNION ALL
SELECT 'bananas' AS item, 5 AS sales
UNION ALL
SELECT 'apples' AS item, 7 AS sales
)
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales;
----
3 14

query TII rowsort
(
SELECT 'apples' AS item, 2 AS sales
UNION ALL
SELECT 'bananas' AS item, 5 AS sales
UNION ALL
SELECT 'apples' AS item, 7 AS sales
)
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
GROUP BY item;
----
apples 2 9
bananas 1 5

query TII rowsort
(
SELECT 'apples' AS item, 2 AS sales
UNION ALL
SELECT 'bananas' AS item, 5 AS sales
UNION ALL
SELECT 'apples' AS item, 7 AS sales
)
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
GROUP BY item
|> WHERE num_items > 1;
----
apples 2 9
91 changes: 91 additions & 0 deletions docs/source/user-guide/sql/select.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ DataFusion currently supports the following pipe operators:
- [LIMIT](#pipe_limit)
- [SELECT](#pipe_select)
- [EXTEND](#pipe_extend)
- [AS](#pipe_as)
- [UNION](#pipe_union)
- [INTERSECT](#pipe_intersect)
- [EXCEPT](#pipe_except)
- [AGGREGATE](#pipe_aggregate)

(pipe_where)=

Expand Down Expand Up @@ -423,3 +428,89 @@ select * from range(0,3)
| 2 | -2 |
+-------+-------------+
```

(pipe_as)=

### AS

```sql
select * from range(0,3)
|> as my_range
|> SELECT my_range.value;
+-------+
| value |
+-------+
| 0 |
| 1 |
| 2 |
+-------+
```

(pipe_union)=

### UNION

```sql
select * from range(0,3)
|> union all (
select * from range(3,6)
);
+-------+
| value |
+-------+
| 0 |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
+-------+
```

(pipe_intersect)=

### INTERSECT

```sql
select * from range(0,100)
|> INTERSECT DISTINCT (
select 3
);
+-------+
| value |
+-------+
| 3 |
+-------+
```

(pipe_except)=

### EXCEPT

```sql
select * from range(0,10)
|> EXCEPT DISTINCT (select * from range(5,10));
+-------+
| value |
+-------+
| 0 |
| 1 |
| 2 |
| 3 |
| 4 |
+-------+
```

(pipe_aggregate)=

### AGGREGATE

```sql
select * from range(0,3)
|> aggregate sum(value) AS total;
+-------+
| total |
+-------+
| 3 |
+-------+
```