Skip to content

Internal error: PhysicalExpr Column references bound error, Failure in spilling for AggregateMode::Single #15530

@rluvaton

Description

@rluvaton

Describe the bug

when using aggregate exec with single mode, and spilling and the group by expressions are not the first expressions from the previous plan there will be schema mismatch

To Reproduce

#[cfg(test)]
mod tests {
    use std::fmt::{Display, Formatter};
    use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray};
    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
    use datafusion::common::Result;
    use datafusion::execution::memory_pool::FairSpillPool;
    use datafusion::execution::runtime_env::RuntimeEnvBuilder;
    use datafusion::execution::TaskContext;
    use datafusion::functions_aggregate::sum::sum_udaf;
    use datafusion::physical_expr::aggregate::AggregateExprBuilder;
    use datafusion::physical_expr::expressions::{lit, Column};
    use datafusion::physical_plan::aggregates::{PhysicalGroupBy, AggregateExec, AggregateMode};
    use datafusion::physical_plan::common::collect;
    use datafusion::physical_plan::ExecutionPlan;
    use rand::{random, thread_rng, Rng};
    use std::sync::Arc;
    use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
    use parking_lot::RwLock;

    #[tokio::test]
    async fn test_debug() -> Result<()> {
        let scan_schema = Arc::new(Schema::new(vec![
            Field::new("col_0", DataType::Int64, true),
            Field::new("col_1", DataType::Utf8, true),
            Field::new("col_2", DataType::Utf8, true),
            Field::new("col_3", DataType::Utf8, true),
            Field::new("col_4", DataType::Utf8, true),
            Field::new("col_5", DataType::Int32, true),
            Field::new("col_6", DataType::Utf8, true),
            Field::new("col_7", DataType::Utf8, true),
            Field::new("col_8", DataType::Utf8, true),
        ]));

        let group_by = PhysicalGroupBy::new_single(vec![
            (Arc::new(Column::new("col_1", 1)), "col_1".to_string()),
            (Arc::new(Column::new("col_7", 7)), "col_7".to_string()),
            (Arc::new(Column::new("col_0", 0)), "col_0".to_string()),
            (Arc::new(Column::new("col_8", 8)), "col_8".to_string()),
        ]);

        fn generate_int64_array() -> ArrayRef {
            Arc::new(Int64Array::from_iter_values(
                (0..8192).map(|_| random::<i64>()),
            ))
        }
        fn generate_int32_array() -> ArrayRef {
            Arc::new(Int32Array::from_iter_values(
                (0..8192).map(|_| random::<i32>()),
            ))
        }

        fn generate_string_array() -> ArrayRef {
            Arc::new(StringArray::from(
                (0..8192)
                    .map(|_| -> String {
                        thread_rng()
                            .sample_iter::<char, _>(rand::distributions::Standard)
                            .take(10)
                            .collect()
                    })
                    .collect::<Vec<_>>(),
            ))
        }

        fn generate_record_batch(schema: &SchemaRef) -> Result<RecordBatch> {
            RecordBatch::try_new(
                Arc::clone(&schema),
                vec![
                    generate_int64_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_int32_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                ],
            )
            .map_err(|err| err.into())
        }

        let aggregate_expressions = vec![Arc::new(
            AggregateExprBuilder::new(sum_udaf(), vec![lit(1i64)])
                .schema(Arc::clone(&scan_schema))
                .alias("SUM(1i64)")
                .build()?,
        )];

        #[derive(Debug)]
        struct Generator {
            index: usize,
            count: usize,
            schema: SchemaRef,
        }

        impl Display for Generator {
            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
                write!(f, "Generator")
            }
        }

        impl LazyBatchGenerator for Generator {
            fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
                if self.index > self.count {
                    return Ok(None);
                }

                let batch = generate_record_batch(&self.schema)?;
                self.index += 1;

                Ok(Some(batch))
            }
        }

        let generator = Generator {
            index: 0,
            count: 10,
            schema: Arc::clone(&scan_schema),
        };

        let plan: Arc<dyn ExecutionPlan> =
          Arc::new(LazyMemoryExec::try_new(Arc::clone(&scan_schema), vec![Arc::new(RwLock::new(generator))])?);

        let single_aggregate = Arc::new(AggregateExec::try_new(
            AggregateMode::Single,
            group_by,
            aggregate_expressions.clone(),
            vec![None; aggregate_expressions.len()],
            plan,
            Arc::clone(&scan_schema),
        )?);

        let memory_pool = Arc::new(FairSpillPool::new(10006216));
        let task_ctx = Arc::new(
            TaskContext::default().with_runtime(Arc::new(
                RuntimeEnvBuilder::new()
                    .with_memory_pool(memory_pool)
                    .build()?,
            )),
        );

        let res = collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await;

        match res {
            Ok(_) => println!("Success"),
            Err(e) => {
                println!("Error: {}", e);

                return Err(e);
            },
        }

        Ok(())
    }
}

The following error happen:

Error: Internal error: PhysicalExpr Column references column 'col_7' at index 7 (zero-based) but input schema only has 5 columns: ["col_1", "col_7", "col_0", "col_8", "SUM(1i64)[sum]"]

backtrace:    0: std::backtrace_rs::backtrace::libunwind::trace
             ...
   1: std::backtrace_rs::backtrace::trace_unsynchronized
             at ...
   2: std::backtrace::Backtrace::create
             at ...
   3: datafusion_common::error::DataFusionError::get_back_trace
             at <crates>/datafusion-common-46.0.1/src/error.rs:473:30
   4: datafusion_physical_expr::expressions::column::Column::bounds_check
             at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:147:13
   5: <datafusion_physical_expr::expressions::column::Column as datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate
             at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:126:9
   6: datafusion_physical_plan::aggregates::evaluate_group_by::{{closure}}
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1305:25
   7: core::iter::adapters::map::map_try_fold::{{closure}}
             at ...
             
  8-21: std stuff
  
  22: core::iter::traits::iterator::Iterator::collect
             at ...
  23: datafusion_physical_plan::aggregates::evaluate_group_by
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1301:32
  24: datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:821:13
  25: <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:697:29
             
  26-28: futures stuff
  
  29: datafusion_physical_plan::common::collect::{{closure}}
             at <crates>/datafusion-physical-plan-46.0.1/src/common.rs:45:36
  30: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
             at ./src/repro_bug_agg.rs:145:99
             
  31-51: std and tokio stuff
  
  52: datafusion_pg::repro_bug_agg::tests::test_debug
             at ./src/repro_bug_agg.rs:147:9
  53: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
             at ./src/repro_bug_agg.rs:23:30
.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

Expected behavior

should not fail

Additional context

The spill schema is:

spill_state.spill_schema: Schema {
    fields: [
        Field {
            name: "col_1",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_7",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_0",
            data_type: Int64,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_8",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "SUM(1i64)[sum]",
            data_type: Int64,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
    ],
    metadata: {},
}

the issue is that the spilling schema is the output schema of the intermediate results while the group by expressions are the same and because column point to an index rather than by name the index now does not exists

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions