Skip to content

Bug: applying multiple times EnforceDistribution generates invalid plan #14150

@xudong963

Description

@xudong963

Describe the bug

For a topk SQL: select * from aggregate_test_100 ORDER BY c13 limit 5;, If applied twice EnforceDistribution, will generate an invalid plan and result in the wrong result.

The root reason is that the fetch of the limit will be missed at the second EnforceDistribution.

To Reproduce

Here is an example to reproduce

use std::sync::Arc;
use futures::StreamExt;
use datafusion::prelude::*;
use datafusion::physical_optimizer::{
    coalesce_batches::CoalesceBatches,
    enforce_distribution::EnforceDistribution,
    output_requirements::OutputRequirements,
    PhysicalOptimizerRule,
};
use datafusion::error::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};

#[tokio::main]
async fn main() -> Result<()> {
    // Create a configuration
    let config = SessionConfig::new();
    let ctx = SessionContext::new_with_config(config);

    // Create table schema and data
    // To reproduce to bug: the LOCATION should contain more than one aggregate_test_100.csv
    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
        c1  VARCHAR NOT NULL,
        c2  TINYINT NOT NULL,
        c3  SMALLINT NOT NULL,
        c4  SMALLINT,
        c5  INT,
        c6  BIGINT NOT NULL,
        c7  SMALLINT NOT NULL,
        c8  INT NOT NULL,
        c9  BIGINT UNSIGNED NOT NULL,
        c10 VARCHAR NOT NULL,
        c11 FLOAT NOT NULL,
        c12 DOUBLE NOT NULL,
        c13 VARCHAR NOT NULL
    )
    STORED AS CSV
    LOCATION './testing/data/csv/' 
    OPTIONS ('format.has_header' 'true')";

    ctx.sql(sql).await?;

    let df = ctx.sql("SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 5").await?;
    let logical_plan = df.logical_plan().clone();
    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
        logical_plan,
        ctx.state().config_options(),
        |_, _| (),
    )?;

    let optimized_logical_plan = ctx.state().optimizer().optimize(
        analyzed_logical_plan,
        &ctx.state(),
        |_, _| (),
    )?;

    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
        // If there is a output requirement of the query, make sure that
        // this information is not lost across different rules during optimization.
        Arc::new(OutputRequirements::new_add_mode()),
        // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
        // requirements. Please make sure that the whole plan tree is determined before this rule.
        // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
        // least one of the operators in the plan benefits from increased parallelism.
        Arc::new(EnforceDistribution::new()),
        Arc::new(EnforceSorting::new()),
        // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
        Arc::new(ProjectionPushdown::new()),
        // The CoalesceBatches rule will not influence the distribution and ordering of the
        // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
        Arc::new(CoalesceBatches::new()),
        Arc::new(EnforceDistribution::new()), // -- Add enforce distribution rule again
        // Remove the ancillary output requirement operator since we are done with the planning
        // phase.
        Arc::new(OutputRequirements::new_remove_mode()),
        Arc::new(ProjectionPushdown::new()),
        // The LimitPushdown rule tries to push limits down as far as possible,
        // replacing operators with fetching variants, or adding limits
        // past operators that support limit pushdown.
        Arc::new(LimitPushdown::new()),
        // The SanityCheckPlan rule checks whether the order and
        // distribution requirements of each node in the plan
        // is satisfied. It will also reject non-runnable query
        // plans that use pipeline-breaking operators on infinite
        // input(s). The rule generates a diagnostic error
        // message for invalid plans. It makes no changes to the
        // given query plan; i.e. it only acts as a final
        // gatekeeping rule.
        Arc::new(SanityCheckPlan::new()),
    ];

    // 2. Generate initial physical plan
    let planner = DefaultPhysicalPlanner::default();
    let session_state = SessionStateBuilder::new().
        with_config(ctx.copied_config()).with_default_features().
        with_physical_optimizer_rules(optimizers).build();
    let optimized_physical_plan = planner.create_physical_plan(&optimized_logical_plan, &session_state).await?;

    let mut results = optimized_physical_plan.execute(0, ctx.task_ctx().clone()).unwrap();

    let batch = results.next().await.unwrap()?;
    dbg!(batch.num_rows()); // 10 rows: unexpected result
    Ok(())
}

Expected behavior

Generated a valid plan and correct result as the doc said: https://github.com/apache/datafusion/blob/main/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L159-L168

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions