Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hash partitioned aggregation #320

Merged
merged 18 commits into from
May 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ message ProjectionExecNode {
enum AggregateMode {
PARTIAL = 0;
FINAL = 1;
FINAL_PARTITIONED = 2;
}

message HashAggregateExecNode {
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let agg_mode: AggregateMode = match mode {
protobuf::AggregateMode::Partial => AggregateMode::Partial,
protobuf::AggregateMode::Final => AggregateMode::Final,
protobuf::AggregateMode::FinalPartitioned => {
AggregateMode::FinalPartitioned
}
};

let group = hash_agg
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
let agg_mode = match exec.mode() {
AggregateMode::Partial => protobuf::AggregateMode::Partial,
AggregateMode::Final => protobuf::AggregateMode::Final,
AggregateMode::FinalPartitioned => {
protobuf::AggregateMode::FinalPartitioned
}
};
let input_schema = exec.input_schema();
let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
let config = ExecutionConfig::new()
.with_concurrency(1)
.with_repartition_joins(false)
.with_repartition_aggregations(false)
.with_physical_optimizer_rules(rules);
ExecutionContext::with_config(config)
}
7 changes: 2 additions & 5 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl DistributedPlanner {
//TODO should insert query stages in more generic way based on partitioning metadata
// and not specifically for this operator
match agg.mode() {
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage = create_query_stage(
Expand Down Expand Up @@ -237,10 +237,9 @@ mod test {
use ballista_core::serde::protobuf;
use ballista_core::utils::format_plan;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -278,11 +277,9 @@ mod test {
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2

QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
MergeExec
UnresolvedShuffleExec: stages=[1]

QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ pub const TPCH_TABLES: &[&str] = &[
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Dandandan marked this conversation as resolved.
Show resolved Hide resolved
Arc::new(AddMergeExec::new()),
Arc::new(CoalesceBatches::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let config = ExecutionConfig::new()
.with_physical_optimizer_rules(rules)
.with_repartition_aggregations(false);
let mut ctx = ExecutionContext::with_config(config);

for table in TPCH_TABLES {
Expand Down
19 changes: 9 additions & 10 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `concurrency` level
pub repartition_joins: bool,
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
}

impl ExecutionConfig {
Expand Down Expand Up @@ -663,6 +666,7 @@ impl ExecutionConfig {
create_default_catalog_and_schema: true,
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
}
}

Expand Down Expand Up @@ -746,6 +750,11 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down Expand Up @@ -1351,7 +1360,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped() -> Result<()> {
let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);
Dandandan marked this conversation as resolved.
Show resolved Hide resolved

let expected = vec![
"+----+---------+",
Expand All @@ -1371,7 +1379,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_avg() -> Result<()> {
let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand All @@ -1392,7 +1399,6 @@ mod tests {
async fn boolean_literal() -> Result<()> {
let results =
execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+------+",
Expand All @@ -1414,7 +1420,6 @@ mod tests {
async fn aggregate_grouped_empty() -> Result<()> {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec!["++", "||", "++", "++"];
assert_batches_sorted_eq!(expected, &results);
Expand All @@ -1425,7 +1430,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_max() -> Result<()> {
let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand All @@ -1445,7 +1449,6 @@ mod tests {
#[tokio::test]
async fn aggregate_grouped_min() -> Result<()> {
let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+---------+",
Expand Down Expand Up @@ -1629,7 +1632,6 @@ mod tests {
#[tokio::test]
async fn count_aggregated() -> Result<()> {
let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----+-----------+",
Expand Down Expand Up @@ -1681,7 +1683,6 @@ mod tests {
&mut ctx,
"SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------------------+---------+",
Expand Down Expand Up @@ -1925,7 +1926,6 @@ mod tests {
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
Expand All @@ -1952,7 +1952,6 @@ mod tests {
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::HashPartitioned(_) => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ fn optimize_concurrency(
.map(|child| {
optimize_concurrency(
concurrency,
plan.required_child_distribution() == Distribution::SinglePartition,
matches!(
plan.required_child_distribution(),
Distribution::SinglePartition
),
child.clone(),
)
})
Expand Down
22 changes: 17 additions & 5 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ pub enum AggregateMode {
Partial,
/// Final aggregate that produces a single partition of output
Final,
/// Final aggregate that works on pre-partitioned data.
///
/// This requires the invariant that all rows with a particular
/// grouping key are in the same partitions, such as is the case
/// with Hash repartitioning on the group keys. If a group key is
/// duplicated, duplicate groups would be produced
FinalPartitioned,
}

/// Hash aggregate execution plan
Expand Down Expand Up @@ -123,7 +130,7 @@ fn create_schema(
fields.extend(expr.state_fields()?.iter().cloned())
}
}
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
// in final mode, the field with the final result of the accumulator
for expr in aggr_expr {
fields.push(expr.field()?)
Expand Down Expand Up @@ -204,6 +211,9 @@ impl ExecutionPlan for HashAggregateExec {
fn required_child_distribution(&self) -> Distribution {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.group_expr.iter().map(|x| x.0.clone()).collect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

),
AggregateMode::Final => Distribution::SinglePartition,
}
}
Expand Down Expand Up @@ -454,7 +464,7 @@ fn group_aggregate_batch(
})
.try_for_each(|(accumulator, values)| match mode {
AggregateMode::Partial => accumulator.update_batch(&values),
AggregateMode::Final => {
AggregateMode::FinalPartitioned | AggregateMode::Final => {
// note: the aggregation here is over states, not values, thus the merge
accumulator.merge_batch(&values)
}
Expand Down Expand Up @@ -807,7 +817,7 @@ fn aggregate_expressions(
Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
}
// in this mode, we build the merge expressions of the aggregation
AggregateMode::Final => Ok(aggr_expr
AggregateMode::Final | AggregateMode::FinalPartitioned => Ok(aggr_expr
.iter()
.map(|agg| merge_expressions(agg))
.collect::<Result<Vec<_>>>()?),
Expand Down Expand Up @@ -901,7 +911,9 @@ fn aggregate_batch(
// 1.3
match mode {
AggregateMode::Partial => accum.update_batch(values),
AggregateMode::Final => accum.merge_batch(values),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
accum.merge_batch(values)
}
}
})
}
Expand Down Expand Up @@ -1074,7 +1086,7 @@ fn finalize_aggregation(
.collect::<Result<Vec<_>>>()?;
Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
}
AggregateMode::Final => {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
// merge the state to the final value
accumulators
.iter()
Expand Down
Loading