Skip to content

Partial AggregateMode will generate duplicate field names which will fail DFSchema construct #17662

@zhuqi-lucas

Description

@zhuqi-lucas

Describe the bug

After the PR since apache datafusion 50.0.0, we will have name check for DFSchema construct:

#17189

Details:
https://github.com/apache/datafusion/pull/17189/files#r2277001021

But Partial AggregateMode will generate duplicate field names which will fail DFSchema construct, in our production we use lower level to do this, so we need to make Partial AggregateMode not generate duplicate field names.

To Reproduce

Reproduced code:

#[test]
fn test_duplicate_state_fields_fails_for_dfschema_construct() -> Result<()> {
    let ctx = SessionContext::new();

    // Simple schema with just the fields we need
    let file_schema = Arc::new(Schema::new(vec![
        Field::new(
            "timestamp",
            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
            true,
        ),
        Field::new("ticker", DataType::Utf8, true),
        Field::new("value", DataType::Float64, true),
        Field::new("date", DataType::Utf8, false),
    ]));


    let df_schema = DFSchema::try_from(file_schema.clone())?;

    let timestamp = col("timestamp");
    let value = col("value");
    let ticker = col("ticker");
    let date = col("date");

    // Create a simple mock execution plan (you can replace this with EmptyExec if needed)
    let mock_exec = Arc::new(EmptyExec::new(file_schema.clone()));

    // Build first_value aggregate
    let first_value = Arc::new(
        AggregateExprBuilder::new(
            datafusion_functions_aggregate::first_last::first_value_udaf(),
            vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
        )
            .alias("first_value(value)")
            .order_by(vec![PhysicalSortExpr::new(
                ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
                SortOptions::new(false, false),
            )])
            .schema(file_schema.clone())
            .build()
            .expect("Failed to build first_value"),
    );

    // Build last_value aggregate
    let last_value = Arc::new(
        AggregateExprBuilder::new(
            datafusion_functions_aggregate::first_last::last_value_udaf(),
            vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
        )
            .alias("last_value(value)")
            .order_by(vec![PhysicalSortExpr::new(
                ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
                SortOptions::new(false, false),
            )])
            .schema(file_schema.clone())
            .build()
            .expect("Failed to build last_value"),
    );

    let partial_agg = AggregateExec::try_new(
        AggregateMode::Partial,
        PhysicalGroupBy::new_single(vec![
            (
                ctx.create_physical_expr(date.clone(), &df_schema)?,
                "date".to_string(),
            ),
            (
                ctx.create_physical_expr(ticker.clone(), &df_schema)?,
                "ticker".to_string(),
            ),
        ]),
        vec![first_value, last_value], // Both aggregates together
        vec![None, None],
        mock_exec,
        file_schema,
    ).expect("Failed to build partial agg");

    println!("{:?}", partial_agg.schema());

    for field in partial_agg.schema().fields() {
        println!("Field: {}", field.name());
    }

    let partial_agg_exec_schema = DFSchema::try_from(partial_agg.schema());

    // This should fail due to duplicate state field names
    assert!(
        partial_agg_exec_schema.is_err(),
        "Expected get AggregateExec to fail due to duplicate state field names"
    );

    if let Err(e) = partial_agg_exec_schema {
        println!("Expected error due to duplicate state fields: {}", e);
        // Verify it's the specific duplicate field error we expect
        assert!(e.to_string().contains("duplicate") || e.to_string().contains("Duplicate"));
    }

    Ok(())
}

The output:

Schema { fields: [Field { name: "date", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "ticker", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "first_value(value)[first_value]", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp@0", data_type: Timestamp(Nanosecond, Some("UTC")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "is_set", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "last_value(value)[last_value]", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp@0", data_type: Timestamp(Nanosecond, Some("UTC")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "is_set", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
Field: date
Field: ticker
Field: first_value(value)[first_value]
Field: timestamp@0
Field: is_set
Field: last_value(value)[last_value]
Field: timestamp@0
Field: is_set
Expected error due to duplicate state fields: Schema error: Schema contains duplicate unqualified field name "timestamp@0"

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingregressionSomething that used to work no longer does

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions