Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,35 @@ use crate::physical_optimizer::test_utils::{
};
use crate::physical_optimizer::test_utils::{parquet_exec_with_sort, trim_plan_display};

use crate::sql::ExplainNormalizer;
use arrow::compute::SortOptions;
use datafusion::config::ConfigOptions;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion_physical_optimizer::enforce_distribution::*;
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
Expand All @@ -62,6 +71,7 @@ use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::PlanProperties;
use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use futures::StreamExt;

/// Models operators like BoundedWindowExec that require an input
/// ordering but is easy to construct
Expand Down Expand Up @@ -3154,3 +3164,94 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is nice to have this "end to end" style test, but given the amount of code changed I think it is important to have more "unit style" tests otherwise it is hard to understand how general this fix is (or if it just works for the specified query)

I wonder if you could construct some cases using the same framework as the tests above? Aka make a plan and then run EnforceDistribution twice on it and ensure the plans are ok?

Or perhaps you can update the assert_optimized! to ensure that running EnforceDistribution twice doesn't change the plan again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this test, a data test in .slt's would still be helpful IMO

// Create a configuration
let config = SessionConfig::new();
let ctx = SessionContext::new_with_config(config);
let testdata = datafusion::test_util::arrow_test_data();
let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
// Create table schema and data
let sql = format!(
"CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
)
STORED AS CSV
LOCATION '{csv_file}'
OPTIONS ('format.has_header' 'true')"
);

ctx.sql(sql.as_str()).await?;

let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
let logical_plan = df.logical_plan().clone();
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
logical_plan,
ctx.state().config_options(),
|_, _| (),
)?;
let optimized_logical_plan = ctx.state().optimizer().optimize(
analyzed_logical_plan,
&ctx.state(),
|_, _| (),
)?;

let planner = DefaultPhysicalPlanner::default();
let session_state = SessionStateBuilder::new()
.with_config(ctx.copied_config())
.with_default_features()
// The second `EnforceDistribution` should be run with `OutputRequirements` to reproduce the bug.
.with_physical_optimizer_rule(Arc::new(OutputRequirements::new_add_mode()))
.with_physical_optimizer_rule(Arc::new(EnforceDistribution::new())) // -- Add enforce distribution rule again
.with_physical_optimizer_rule(Arc::new(OutputRequirements::new_remove_mode()))
.build();
let optimized_physical_plan = planner
.create_physical_plan(&optimized_logical_plan, &session_state)
.await?;

let normalizer = ExplainNormalizer::new();
let actual = format!(
"{}",
displayable(optimized_physical_plan.as_ref()).indent(true)
)
.trim()
.lines()
// normalize paths
.map(|s| normalizer.normalize(s))
.collect::<Vec<_>>();
// Test the optimized plan is correct (after twice `EnforceDistribution`)
// The `fetch` is maintained after the second `EnforceDistribution`
let expected = vec![
"SortExec: TopK(fetch=5), expr=[c13@12 ASC NULLS LAST], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" UnionExec",
" SortExec: TopK(fetch=5), expr=[c13@12 ASC NULLS LAST], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], file_type=csv, has_header=true",
" SortExec: TopK(fetch=5), expr=[c13@12 ASC NULLS LAST], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], file_type=csv, has_header=true",
];
assert_eq!(
expected, actual,
"expected:\n{expected:#?}\nactual:\n\n{actual:#?}\n"
);

let mut results = optimized_physical_plan.execute(0, ctx.task_ctx().clone())?;

let batch = results.next().await.unwrap()?;
// Without the fix of https://github.com/apache/datafusion/pull/14207, the number of rows will be 10
assert_eq!(batch.num_rows(), 5);
Ok(())
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub struct ExplainNormalizer {
}

impl ExplainNormalizer {
fn new() -> Self {
pub(crate) fn new() -> Self {
let mut replacements = vec![];

let mut push_path = |path: PathBuf, key: &str| {
Expand All @@ -266,7 +266,7 @@ impl ExplainNormalizer {
Self { replacements }
}

fn normalize(&self, s: impl Into<String>) -> String {
pub(crate) fn normalize(&self, s: impl Into<String>) -> String {
let mut s = s.into();
for (from, to) in &self.replacements {
s = s.replace(from, to);
Expand Down
Loading
Loading