Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 50 additions & 57 deletions src/distributed_planner/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::common::require_one_child;
use crate::distributed_planner::plan_annotator::{
AnnotatedPlan, RequiredNetworkBoundary, annotate_plan,
AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan,
};
use crate::{
DistributedConfig, DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, TaskEstimator,
};
use datafusion::common::internal_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -90,68 +89,62 @@ fn distribute_plan(
let d_cfg = DistributedConfig::from_config_options(cfg)?;

let children = annotated_plan.children;
// This is a leaf node, so we need to scale it up with the final task count.
if children.is_empty() {
let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
&annotated_plan.plan,
annotated_plan.task_count.as_usize(),
cfg,
);
return Ok(scaled_up.unwrap_or(annotated_plan.plan));
}

let parent_task_count = annotated_plan.task_count.as_usize();
let task_count = annotated_plan.task_count.as_usize();
let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max();

let new_children = children
.into_iter()
.map(|child| distribute_plan(child, cfg, query_id, stage_id))
.collect::<Result<Vec<_>, _>>()?;

// It does not need a NetworkBoundary, so just keep recursing.
let Some(nb_req) = annotated_plan.required_network_boundary else {
return annotated_plan.plan.with_new_children(new_children);
};

// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if parent_task_count == 1 && max_child_task_count == Some(1) {
return annotated_plan.plan.with_new_children(new_children);
}

// If the current node has a RepartitionExec below, it needs a shuffle, so put one
// NetworkShuffleExec boundary in between the RepartitionExec and the current node.
if nb_req == RequiredNetworkBoundary::Shuffle {
let new_child = Arc::new(NetworkShuffleExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
parent_task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
return annotated_plan.plan.with_new_children(vec![new_child]);
}

// If this is a CoalescePartitionsExec or a SortMergePreservingExec, it means that the original
// plan is trying to merge all partitions into one. We need to go one step ahead and also merge
// all distributed tasks into one.
if nb_req == RequiredNetworkBoundary::Coalesce {
let new_child = Arc::new(NetworkCoalesceExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
parent_task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
return annotated_plan.plan.with_new_children(vec![new_child]);
match annotated_plan.plan_or_nb {
// This is a leaf node. It needs to be scaled up in order to account for it running in
// multiple tasks.
PlanOrNetworkBoundary::Plan(plan) if plan.children().is_empty() => {
let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
&plan,
annotated_plan.task_count.as_usize(),
cfg,
);
Ok(scaled_up.unwrap_or(plan))
}
// This is a normal intermediate plan, just pass it through with the mapped children.
PlanOrNetworkBoundary::Plan(plan) => plan.with_new_children(new_children),
// This is a shuffle, so inject a NetworkShuffleExec here in the plan.
PlanOrNetworkBoundary::Shuffle => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
}
let node = Arc::new(NetworkShuffleExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
}
// DataFusion is trying to coalesce multiple partitions into one, so we shoudl do the
// same with tasks.
PlanOrNetworkBoundary::Coalesce => {
// It would need a network boundary, but on both sides of the boundary there is just 1 task,
// so we are fine with not introducing any network boundary.
if task_count == 1 && max_child_task_count == Some(1) {
return require_one_child(new_children);
}
let node = Arc::new(NetworkCoalesceExec::try_new(
require_one_child(new_children)?,
query_id,
*stage_id,
task_count,
max_child_task_count.unwrap_or(1),
)?);
stage_id.add_assign(1);
Ok(node)
}
}

internal_err!(
"Unreachable code reached in distribute_plan. Could not determine how to place a network boundary below {}",
annotated_plan.plan.name()
)
}

/// Rearranges the [CoalesceBatchesExec] nodes in the plan so that they are placed right below
Expand Down
Loading