Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving PipelineFixer above all rules to use ExecutionPlan APIs #5880

Merged
merged 15 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,18 @@ impl SessionState {
// We need to take care of the rule ordering. They may influence each other.
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
Arc::new(PipelineFixer::new()),
// In order to increase the parallelism, the Repartition rule will change the
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
Expand All @@ -1299,18 +1311,6 @@ impl SessionState {
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and EnforceSorting, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
Arc::new(PipelineFixer::new()),
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
Expand Down
101 changes: 67 additions & 34 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}

fn benefits_from_input_partitioning(&self) -> bool {
false
}

fn required_input_distribution(&self) -> Vec<Distribution> {
let (left_expr, right_expr) = self
.on
Expand All @@ -388,16 +392,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
.unzip();
// TODO: This will change when we extend collected executions.
vec![
if self.left.output_partitioning().partition_count() == 1 {
Distribution::SinglePartition
} else {
Distribution::HashPartitioned(left_expr)
},
if self.right.output_partitioning().partition_count() == 1 {
Distribution::SinglePartition
} else {
Distribution::HashPartitioned(right_expr)
},
Distribution::HashPartitioned(left_expr),
Distribution::HashPartitioned(right_expr),
]
}

Expand Down Expand Up @@ -1509,7 +1505,7 @@ mod tests {
hash_join_utils::tests::complicated_filter, HashJoinExec, PartitionMode,
};
use crate::physical_plan::{
collect, common, memory::MemoryExec, repartition::RepartitionExec,
common, memory::MemoryExec, repartition::RepartitionExec,
};
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::register_unbounded_file_with_ordering;
Expand Down Expand Up @@ -2182,9 +2178,9 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn join_change_in_planner() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(1);
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new().unwrap();
let left_file_path = tmp_dir.path().join("left.csv");
Expand Down Expand Up @@ -2225,21 +2221,39 @@ mod tests {
true,
)
.await?;
let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
let physical_plan = df.create_physical_plan().await?;
let task_ctx = ctx.task_ctx();
let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
let formatted = pretty_format_batches(&results).unwrap().to_string();
let found = formatted
.lines()
.any(|line| line.contains("SymmetricHashJoinExec"));
assert!(found);
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
let formatted = crate::physical_plan::displayable(physical_plan.as_ref())
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
.indent()
.to_string();
let expected = {
[
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
// " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
// " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(3);
actual.remove(5);

assert_eq!(
expected,
actual[..],
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn join_change_in_planner_without_sort() -> Result<()> {
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
let config = SessionConfig::new().with_target_partitions(1);
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new()?;
let left_file_path = tmp_dir.path().join("left.csv");
Expand All @@ -2262,22 +2276,41 @@ mod tests {
CsvReadOptions::new().schema(&schema).mark_infinite(true),
)
.await?;
let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
let physical_plan = df.create_physical_plan().await?;
let task_ctx = ctx.task_ctx();
let results = collect(physical_plan.clone(), task_ctx).await?;
let formatted = pretty_format_batches(&results)?.to_string();
let found = formatted
.lines()
.any(|line| line.contains("SymmetricHashJoinExec"));
assert!(found);
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
let formatted = crate::physical_plan::displayable(physical_plan.as_ref())
.indent()
.to_string();
let expected = {
[
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
// " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
// " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(3);
actual.remove(5);

assert_eq!(
expected,
actual[..],
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
let config =
SessionConfig::new().with_allow_symmetric_joins_without_pruning(false);
let config = SessionConfig::new()
.with_target_partitions(8)
.with_allow_symmetric_joins_without_pruning(false);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new()?;
let left_file_path = tmp_dir.path().join("left.csv");
Expand Down