Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema error when spilling with multiple aggregations #13949

Open
Friede80 opened this issue Dec 30, 2024 · 2 comments · May be fixed by #13995
Open

Schema error when spilling with multiple aggregations #13949

Friede80 opened this issue Dec 30, 2024 · 2 comments · May be fixed by #13995
Assignees
Labels
bug Something isn't working

Comments

@Friede80
Copy link

Describe the bug

There is an issue when using multiple aggregations and triggering a spill in a GroupedHashAggregateStream.

A query like SELECT MIN(b), MAX(b) FROM table GROUP BY a results in an error

ArrowError(InvalidArgumentError("number of columns(3) must match number of fields(2) in schema"), None)

The problematic schema is captured here:

// Use input batch (Partial mode) schema for spilling because
// the spilled data will be merged and re-evaluated later.
self.spill_state.spill_schema = batch.schema();

And the error gets thrown from emit here:
let batch = RecordBatch::try_new(schema, output)?;

It takes the schema of the input batch and then uses that schema for the intermediate aggregate data to spill to disk. These schemas are clearly not going to match, but I can't quite grok where exactly things have gone wrong.

To Reproduce

Below is a minimal example which should be analogous to a query like SELECT MIN(b), MAX(b) FROM table GROUP BY a. The batch_size and memory pool size are set small to trigger a spill.

use arrow::array::{Float64Array, RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::{
    execution::{memory_pool::FairSpillPool, TaskContext},
    physical_expr::aggregate::AggregateExprBuilder,
    physical_plan::{
        aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
        common,
        expressions::col,
        memory::MemoryExec,
        udaf::AggregateFunctionExpr,
        ExecutionPlan,
    },
    prelude::SessionConfig,
};
use datafusion_common::Result;
use std::sync::Arc;

#[tokio::test]
async fn reproduce_spill_schema_error() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::UInt32, false),
        Field::new("b", DataType::Float64, false),
    ]));

    let batches = vec![
        RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
            ],
        )?,
        RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
            ],
        )?,
    ];
    let plan: Arc<dyn ExecutionPlan> =
        Arc::new(MemoryExec::try_new(&[batches], schema.clone(), None)?);

    let grouping_set = PhysicalGroupBy::new(
        vec![(col("a", &schema)?, "a".to_string())],
        vec![],
        vec![vec![false]],
    );

    let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![
        Arc::new(
            AggregateExprBuilder::new(min_udaf(), vec![col("b", &schema)?])
                .schema(schema.clone())
                .alias("MIN(b)")
                .build()?,
        ),
        Arc::new(
            AggregateExprBuilder::new(max_udaf(), vec![col("b", &schema)?])
                .schema(schema.clone())
                .alias("MAX(b)")
                .build()?,
        ),
    ];

    let single_aggregate = Arc::new(AggregateExec::try_new(
        AggregateMode::Single,
        grouping_set,
        aggregates,
        vec![None, None],
        plan,
        schema.clone(),
    )?);

    let batch_size = 2;
    let memory_pool = Arc::new(FairSpillPool::new(1600));
    let task_ctx = Arc::new(
        TaskContext::default()
            .with_session_config(SessionConfig::new().with_batch_size(batch_size))
            .with_runtime(Arc::new(
                RuntimeEnvBuilder::new()
                    .with_memory_pool(memory_pool)
                    .build()?,
            )),
    );

    let _result =
        common::collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await?;
    Ok(())
}

Expected behavior

I expect the test to complete successfully.

Additional context

No response

@Friede80 Friede80 added the bug Something isn't working label Dec 30, 2024
@kosiew
Copy link
Contributor

kosiew commented Dec 31, 2024

take

@alamb
Copy link
Contributor

alamb commented Dec 31, 2024

Any work we can do to improve the spilling code could also be most apprecaited.

Thanks for the report @Friede80 and the take @kosiew

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants