Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrame.except() does not work with structs in schema #10749

Closed
rtyler opened this issue Jun 1, 2024 · 4 comments · Fixed by #11117
Closed

DataFrame.except() does not work with structs in schema #10749

rtyler opened this issue Jun 1, 2024 · 4 comments · Fixed by #11117
Labels
bug Something isn't working

Comments

@rtyler
Copy link
Contributor

rtyler commented Jun 1, 2024

Describe the bug

When taking two DataFrame objects and running except the function fails when there are Structs in the schema, but succeeds with more simple schemas.

For example, this works:

        let schema = Arc::new(Schema::new(vec![Field::new(
            "value",
            DataType::Int32,
            true),
        ]));
        let batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
                ])
        .unwrap();

        let updated_batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])),
            ],
        )
        .unwrap();
        let _ = datafusion::arrow::util::pretty::print_batches(&[batch.clone()]);
        let _ = datafusion::arrow::util::pretty::print_batches(&[updated_batch.clone()]);

        let ctx = SessionContext::new();
        let before = ctx.read_batch(batch).expect("Failed to make DataFrame");
        let after = ctx.read_batch(updated_batch).expect("Failed to make DataFrame");

        let diff = before.except(after).expect("Failed to except").collect().await.expect("Failed to diff");
        assert_eq!(diff.len(), 1);

To Reproduce

        let nested_schema = Arc::new(Schema::new(vec![
            Field::new("id", DataType::Int32, true),
            Field::new("lat", DataType::Int32, true),
            Field::new("long", DataType::Int32, true),
        ]));
        let schema = Arc::new(Schema::new(vec![Field::new(
            "value",
            DataType::Int32,
            true),
            Field::new("nested",
                DataType::Struct(nested_schema.fields.clone()),
                true)
        ]));
        let batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
                Arc::new(StructArray::from(vec![
                    (
                        Arc::new(Field::new("id", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    (
                        Arc::new(Field::new("lat", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    (
                        Arc::new(Field::new("long", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    ]))
                ])
        .unwrap();

        let updated_batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])),
                Arc::new(StructArray::from(vec![
                    (
                        Arc::new(Field::new("id", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    (
                        Arc::new(Field::new("lat", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    (
                        Arc::new(Field::new("long", DataType::Int32, true)),
                        Arc::new(Int32Array::from(vec![1,2,3])) as ArrayRef
                    ),
                    ]))
            ],
        )
        .unwrap();
        let _ = datafusion::arrow::util::pretty::print_batches(&[batch.clone()]);
        let _ = datafusion::arrow::util::pretty::print_batches(&[updated_batch.clone()]);

        let ctx = SessionContext::new();
        let before = ctx.read_batch(batch).expect("Failed to make DataFrame");
        let after = ctx.read_batch(updated_batch).expect("Failed to make DataFrame");

        let diff = before.except(after).expect("Failed to except").collect().await.expect("Failed to diff");
        assert_eq!(diff.len(), 1);

Expected behavior

I would expect the above to pass assertions, instead this output is produced:

running 2 tests
test tests::test_simple ... ok
test tests::test_with_struct ... FAILED

failures:

---- tests::test_with_struct stdout ----
+-------+--------------------------+
| value | nested                   |
+-------+--------------------------+
| 1     | {id: 1, lat: 1, long: 1} |
| 2     | {id: 2, lat: 2, long: 2} |
| 3     | {id: 3, lat: 3, long: 3} |
+-------+--------------------------+
+-------+--------------------------+
| value | nested                   |
+-------+--------------------------+
| 1     | {id: 1, lat: 1, long: 1} |
| 12    | {id: 2, lat: 2, long: 2} |
| 3     | {id: 3, lat: 3, long: 3} |
+-------+--------------------------+
thread 'tests::test_with_struct' panicked at except-df-bug/src/lib.rs:74:84:
Failed to diff: ArrowError(InvalidArgumentError("Invalid comparison operation: Struct([Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"lat\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"long\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) IS NOT DISTINCT FROM Struct([Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"lat\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"long\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])"), None)
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_with_struct

test result: FAILED. 1 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s

Additional context

I should also note that I tested this with DataFusion 37 and 38, same results.

@marvinlanhenke
Copy link
Contributor

I just skimmed this real quick, so I might be wrong here.

But might the issue be rooted at arrow-rs itself:
https://github.com/apache/arrow-rs/blob/master/arrow-ord/src/cmp.rs#L219-L223

l_t.is_nested() in our case - which should return the Err.

@alamb @tustvold

@jayzhan211
Copy link
Contributor

I just skimmed this real quick, so I might be wrong here.

But might the issue be rooted at arrow-rs itself: https://github.com/apache/arrow-rs/blob/master/arrow-ord/src/cmp.rs#L219-L223

l_t.is_nested() in our case - which should return the Err.

@alamb @tustvold

The next arrow-rs release will support nested comparison apache/arrow-rs#5792

@rtyler
Copy link
Contributor Author

rtyler commented Jun 25, 2024

arrow 52 has been released with the fix but the underlying Invalid comparison operation still crops out. I'm trying to trace the code up through LogicalPlanBuilder to find the source of where the error is being generated 😕

rtyler added a commit to buoyant-data/datafusion that referenced this issue Jun 25, 2024
…would work

This relies on newer functionality in arrow 52 and allows
DataFrame.except() to properly work on schemas with structs and lists

Closes apache#10749
@alamb
Copy link
Contributor

alamb commented Jun 26, 2024

The next arrow-rs release will support nested comparison apache/arrow-rs#5792

It seems like the distinct kernel still does not support nested comparisons (which is fine) but that is the reason #11117 was needed. :

https://github.com/apache/arrow-rs/blob/main/arrow-ord/src/cmp.rs#L235-L238

I filed apache/arrow-rs#5960 to track adding this

@alamb alamb closed this as completed in d2ff218 Jun 27, 2024
findepi pushed a commit to findepi/datafusion that referenced this issue Jul 16, 2024
…would work (apache#11117)

This relies on newer functionality in arrow 52 and allows
DataFrame.except() to properly work on schemas with structs and lists

Closes apache#10749
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants