Skip to content

Commit

Permalink
Add LogicalPlan::Distinct (#2792)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrob95 authored Jun 28, 2022
1 parent d5a9b74 commit 839a618
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 39 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub use datafusion_expr::{
logical_plan::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
Expand Down
19 changes: 16 additions & 3 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Expand Down Expand Up @@ -59,7 +59,8 @@ use arrow::datatypes::DataType;
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::ScalarValue;
use datafusion_expr::{expr::GroupingSet, utils::expr_to_columns};
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
use futures::future::BoxFuture;
Expand Down Expand Up @@ -616,6 +617,18 @@ impl DefaultPhysicalPlanner {
physical_input_schema.clone(),
)?) )
}
LogicalPlan::Distinct(Distinct {input}) => {
// Convert distinct to groupby with no aggregations
let group_expr = expand_wildcard(input.schema(), input)?;
let aggregate = LogicalPlan::Aggregate(Aggregate {
input: input.clone(),
group_expr,
aggr_expr: vec![],
schema: input.schema().clone()
}
);
Ok(self.create_initial_plan(&aggregate, session_state).await?)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let input_schema = input.as_ref().schema();
Expand Down
71 changes: 63 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::utils::{
use crate::{and, binary_expr, Operator};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values,
Window,
Expand All @@ -42,7 +42,6 @@ use datafusion_common::{
};
use std::any::Any;
use std::convert::TryFrom;
use std::iter;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -437,16 +436,27 @@ impl LogicalPlanBuilder {

/// Apply a union, removing duplicate rows
pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
self.union(plan)?.distinct()
// unwrap top-level Distincts, to avoid duplication
let left_plan = self.plan.clone();
let left_plan: LogicalPlan = match left_plan {
LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
_ => left_plan,
};
let right_plan: LogicalPlan = match plan {
LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
_ => plan,
};

Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(union_with_alias(left_plan, right_plan, None)?),
})))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(&self) -> Result<Self> {
let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
let plan = LogicalPlanBuilder::from(self.plan.clone())
.aggregate(projection_expr, iter::empty::<Expr>())?
.build()?;
Self::from(plan).project(vec![Expr::Wildcard])
Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(self.plan.clone()),
})))
}

/// Apply a join with on constraint.
Expand Down Expand Up @@ -1141,6 +1151,51 @@ mod tests {
Ok(())
}

#[test]
fn plan_builder_union_distinct_combined_single_union() -> Result<()> {
let plan =
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;

let plan = plan
.union_distinct(plan.build()?)?
.union_distinct(plan.build()?)?
.union_distinct(plan.build()?)?
.build()?;

// output has only one union
let expected = "\
Distinct:\
\n Union\
\n TableScan: employee_csv projection=[state, salary]\
\n TableScan: employee_csv projection=[state, salary]\
\n TableScan: employee_csv projection=[state, salary]\
\n TableScan: employee_csv projection=[state, salary]";

assert_eq!(expected, format!("{:?}", plan));

Ok(())
}

#[test]
fn plan_builder_simple_distinct() -> Result<()> {
let plan =
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
.filter(col("state").eq(lit("CO")))?
.project(vec![col("id")])?
.distinct()?
.build()?;

let expected = "\
Distinct:\
\n Projection: #employee_csv.id\
\n Filter: #employee_csv.state = Utf8(\"CO\")\
\n TableScan: employee_csv projection=[id, state]";

assert_eq!(expected, format!("{:?}", plan));

Ok(())
}

#[test]
fn exists_subquery() -> Result<()> {
let foo = test_table_scan_with_name("foo")?;
Expand Down
9 changes: 5 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ mod plan;
pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values, Window,
};

pub use display::display_schema;
Expand Down
19 changes: 18 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub enum LogicalPlan {
Analyze(Analyze),
/// Extension operator defined outside of DataFusion
Extension(Extension),
/// Remove duplicate rows from the input
Distinct(Distinct),
}

impl LogicalPlan {
Expand All @@ -110,6 +112,7 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
Expand Down Expand Up @@ -188,6 +191,7 @@ impl LogicalPlan {
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
}
Expand Down Expand Up @@ -250,7 +254,8 @@ impl LogicalPlan {
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union(_) => {
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_) => {
vec![]
}
}
Expand All @@ -273,6 +278,7 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
Expand Down Expand Up @@ -408,6 +414,7 @@ impl LogicalPlan {
}
true
}
LogicalPlan::Distinct(Distinct { input }) => input.accept(visitor)?,
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
subquery.accept(visitor)?
Expand Down Expand Up @@ -853,6 +860,9 @@ impl LogicalPlan {
}) => {
write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
}
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
Expand Down Expand Up @@ -1171,6 +1181,13 @@ pub struct Limit {
pub input: Arc<LogicalPlan>,
}

/// Removes duplicate rows from the input
#[derive(Clone)]
pub struct Distinct {
/// The logical plan that is being DISTINCT'd
pub input: Arc<LogicalPlan>,
}

/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
#[derive(Clone)]
Expand Down
9 changes: 6 additions & 3 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
use crate::logical_plan::builder::build_join_schema;
use crate::logical_plan::{
Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter, Join, Limit,
Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union, Values,
Window,
Aggregate, Analyze, CreateMemoryTable, CreateView, Distinct, Extension, Filter, Join,
Limit, Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
Values, Window,
};
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use arrow::datatypes::{DataType, TimeUnit};
Expand Down Expand Up @@ -477,6 +477,9 @@ pub fn from_plan(
alias: alias.clone(),
}))
}
LogicalPlan::Distinct(Distinct { .. }) => Ok(LogicalPlan::Distinct(Distinct {
input: Arc::new(inputs[0].clone()),
})),
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ fn optimize(
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ fn optimize_plan(
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message LogicalPlanNode {
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
DistinctNode distinct = 23;
}
}

Expand Down Expand Up @@ -232,6 +233,10 @@ message JoinNode {
LogicalExprNode filter = 8;
}

message DistinctNode {
LogicalPlanNode input = 1;
}

message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
Expand Down
24 changes: 22 additions & 2 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ use datafusion_common::{Column, DataFusionError};
use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
CrossJoin, EmptyRelation, Extension, Filter, Join, JoinConstraint, JoinType,
Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
CrossJoin, Distinct, EmptyRelation, Extension, Filter, Join, JoinConstraint,
JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, Values,
Window,
},
Expr, LogicalPlan, LogicalPlanBuilder,
};
Expand Down Expand Up @@ -668,6 +669,11 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec.try_decode(node, &input_plans, ctx)?;
Ok(LogicalPlan::Extension(extension_node))
}
LogicalPlanType::Distinct(distinct) => {
let input: LogicalPlan =
into_logical_plan!(distinct.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).distinct()?.build()
}
}
}

Expand Down Expand Up @@ -823,6 +829,20 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
LogicalPlan::Distinct(Distinct { input }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
protobuf::DistinctNode {
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
Expand Down
29 changes: 14 additions & 15 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,17 +1072,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
};

// final projection
let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?;

// process distinct clause
let plan = if select.distinct {
return LogicalPlanBuilder::from(plan)
.aggregate(select_exprs_post_aggr, iter::empty::<Expr>())?
.build();
if select.distinct {
LogicalPlanBuilder::from(plan).distinct()?.build()
} else {
plan
};

// generate the final projection plan
project_with_alias(plan, select_exprs_post_aggr, alias)
Ok(plan)
}
}

/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
Expand Down Expand Up @@ -3963,12 +3961,13 @@ mod tests {
#[test]
fn union() {
let sql = "SELECT order_id from orders UNION SELECT order_id FROM orders";
let expected = "Projection: #order_id\
\n Aggregate: groupBy=[[#order_id]], aggr=[[]]\
\n Union\n Projection: #orders.order_id\
\n TableScan: orders\
\n Projection: #orders.order_id\
\n TableScan: orders";
let expected = "\
Distinct:\
\n Union\
\n Projection: #orders.order_id\
\n TableScan: orders\
\n Projection: #orders.order_id\
\n TableScan: orders";
quick_test(sql, expected);
}

Expand Down

0 comments on commit 839a618

Please sign in to comment.