Skip to content

Commit

Permalink
Add SQL query planner support for DISTRIBUTE BY (#3208)
Browse files Browse the repository at this point in the history
* Add SQL query planner support for DISTRIBUTE BY

* fmt
  • Loading branch information
andygrove authored Aug 20, 2022
1 parent fec33b1 commit c8d61d8
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 4 deletions.
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<_>>>()?;
Partitioning::Hash(runtime_expr, *n)
}
LogicalPartitioning::DistributeBy(_) => {
return Err(DataFusionError::NotImplemented("Physical plan does not support DistributeBy partitioning".to_string()))
}
};
Ok(Arc::new(RepartitionExec::try_new(
physical_input,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ impl LogicalPlan {
n
)
}
Partitioning::DistributeBy(expr) => {
let dist_by_expr: Vec<String> =
expr.iter().map(|e| format!("{:?}", e)).collect();
write!(
f,
"Repartition: DistributeBy({})",
dist_by_expr.join(", "),
)
}
},
LogicalPlan::Limit(Limit {
ref skip,
Expand Down Expand Up @@ -1390,8 +1399,9 @@ pub enum Partitioning {
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified number
/// of partitions.
/// This partitioning scheme is not yet fully supported. See <https://issues.apache.org/jira/browse/ARROW-11011>
Hash(Vec<Expr>, usize),
/// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
DistributeBy(Vec<Expr>),
}

/// Represents which type of plan, when storing multiple
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ pub fn from_plan(
partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
input: Arc::new(inputs[0].clone()),
})),
Partitioning::DistributeBy(_) => Ok(LogicalPlan::Repartition(Repartition {
partitioning_scheme: Partitioning::DistributeBy(expr.to_owned()),
input: Arc::new(inputs[0].clone()),
})),
},
LogicalPlan::Window(Window {
window_expr,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,11 @@ impl AsLogicalPlan for LogicalPlanNode {
Partitioning::RoundRobinBatch(partition_count) => {
PartitionMethod::RoundRobin(*partition_count as u64)
}
Partitioning::DistributeBy(_) => {
return Err(DataFusionError::NotImplemented(
"DistributeBy".to_string(),
))
}
};

Ok(protobuf::LogicalPlanNode {
Expand Down
43 changes: 40 additions & 3 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::{
Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
DropTable, Explain, FileType, JoinType, LogicalPlan, LogicalPlanBuilder, PlanType,
ToStringifiedPlan,
DropTable, Explain, FileType, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, PlanType, ToStringifiedPlan,
};
use datafusion_expr::utils::{
can_hash, expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
Expand Down Expand Up @@ -1009,6 +1009,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias: Option<String>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
// check for unsupported syntax first
if !select.cluster_by.is_empty() {
return Err(DataFusionError::NotImplemented("CLUSTER BY".to_string()));
}
if !select.lateral_views.is_empty() {
return Err(DataFusionError::NotImplemented("LATERAL VIEWS".to_string()));
}
if select.qualify.is_some() {
return Err(DataFusionError::NotImplemented("QUALIFY".to_string()));
}
if select.top.is_some() {
return Err(DataFusionError::NotImplemented("TOP".to_string()));
}

// process `from` clause
let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));
Expand Down Expand Up @@ -1154,10 +1168,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?;

// process distinct clause
if select.distinct {
let plan = if select.distinct {
LogicalPlanBuilder::from(plan).distinct()?.build()
} else {
Ok(plan)
}?;

// DISTRIBUTE BY
if !select.distribute_by.is_empty() {
let x = select
.distribute_by
.iter()
.map(|e| self.sql_expr_to_logical_expr(e.clone(), &combined_schema, ctes))
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(plan)
.repartition(Partitioning::DistributeBy(x))?
.build()
} else {
Ok(plan)
}
}

Expand Down Expand Up @@ -4936,6 +4964,15 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn test_distribute_by() {
let sql = "select id from person distribute by state";
let expected = "Repartition: DistributeBy(#state)\
\n Projection: #person.id\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn test_double_quoted_literal_string() {
// Assert double quoted literal string is parsed correctly like single quoted one in specific dialect.
Expand Down

0 comments on commit c8d61d8

Please sign in to comment.