Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intermittent failures in fuzz_cases::join_fuzz::test_anti_join_1k_filtered #11555

Closed
Tracked by #9846
alamb opened this issue Jul 19, 2024 · 14 comments · Fixed by #11604
Closed
Tracked by #9846

Intermittent failures in fuzz_cases::join_fuzz::test_anti_join_1k_filtered #11555

alamb opened this issue Jul 19, 2024 · 14 comments · Fixed by #11604
Assignees
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Jul 19, 2024

Describe the bug

I have seen this test fail twice now on two unrelated PRs:

#11540: https://github.com/apache/datafusion/actions/runs/10011021548/job/27673684873?pr=11540

#11527: https://github.com/apache/datafusion/actions/runs/10001535587/job/27645407724?pr=11527

---- fuzz_cases::join_fuzz::test_anti_join_1k_filtered stdout ----
thread 'fuzz_cases::join_fuzz::test_anti_join_1k_filtered' panicked at datafusion/core/tests/fuzz_cases/join_fuzz.rs:537:17:
assertion `left == right` failed: HashJoinExec and SortMergeJoinExec produced different row counts, batch_size: 1
  left: 970
 right: 971
stack backtrace:

And

---- fuzz_cases::join_fuzz::test_anti_join_1k_filtered stdout ----
thread 'fuzz_cases::join_fuzz::test_anti_join_1k_filtered' panicked at datafusion/core/tests/fuzz_cases/join_fuzz.rs:537:17:
assertion `left == right` failed: HashJoinExec and SortMergeJoinExec produced different row counts, batch_size: 1
  left: 952
 right: 953

To Reproduce

Not sure -- it is happening on CI intermittently

Expected behavior

No response

Additional context

No response

@alamb alamb added the bug Something isn't working label Jul 19, 2024
@alamb
Copy link
Contributor Author

alamb commented Jul 19, 2024

Could potentially be related to #11535

@alamb
Copy link
Contributor Author

alamb commented Jul 19, 2024

@comphead comphead self-assigned this Jul 22, 2024
@comphead
Copy link
Contributor

I think the nature can be similar to https://github.com/apache/datafusion/pull/11041/files#r1648318160

I'll do a fix, thanks @alamb for reporting it

@alamb
Copy link
Contributor Author

alamb commented Jul 22, 2024

🤔 it seems to have happened again on main right after #11604 was merged:

https://github.com/apache/datafusion/actions/runs/10046115241/job/27764888311

@comphead
Copy link
Contributor

I have disabled the test for now. I'll spend more time on investigation why this happens

@comphead
Copy link
Contributor

comphead commented Jul 25, 2024

I'm still on it. It has a pretty tricky condition for cross buffered batches.

UPD: I built a repro, working on solution

@comphead
Copy link
Contributor

comphead commented Jul 28, 2024

I found the problem happens if for 1 stream row there are multiple matched buffered rows, but those buffered rows are in separate batches. In this case the datafusion SMJ reacts on the first batch without knowing the next one is coming. I'm still experimenting to find a solution even a hacky one

@comphead
Copy link
Contributor

the repro test case

#[tokio::test]
async fn test_cross_b_bb() {
    let left: Vec<RecordBatch> = make_staggered_batches(1);

    let left = vec![RecordBatch::try_new(
        left[0].schema().clone(),
        vec![
            Arc::new(Int32Array::from(vec![2])),
            Arc::new(Int32Array::from(vec![8])),
            Arc::new(Int32Array::from(vec![-18656817])),
            Arc::new(Int32Array::from(vec![2133711598])),
        ],
    ).unwrap()];

    let right = vec![RecordBatch::try_new(
        left[0].schema().clone(),
        vec![
            Arc::new(Int32Array::from(vec![2])),
            Arc::new(Int32Array::from(vec![8])),
            Arc::new(Int32Array::from(vec![1875176725])),
            Arc::new(Int32Array::from(vec![728454380])),
        ],
    ).unwrap(),
                     RecordBatch::try_new(
        left[0].schema().clone(),
        vec![
            Arc::new(Int32Array::from(vec![2])),
            Arc::new(Int32Array::from(vec![8])),
            Arc::new(Int32Array::from(vec![102493212])),
            Arc::new(Int32Array::from(vec![161372512])),
        ],
        ).unwrap()
    ];

    JoinFuzzTestCase::new(
        left,
        right,
        JoinType::LeftAnti,
        Some(Box::new(col_lt_col_filter)),
    )
        .run_test(&[JoinTestType::HjSmj], false)
        .await;
}

@comphead
Copy link
Contributor

comphead commented Aug 8, 2024

Attached more accurate test case

#[tokio::test]
async fn test_cross() {
    let left: Vec<RecordBatch> = make_staggered_batches(1);

    let left = vec![
        RecordBatch::new_empty(left[0].schema().clone()),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 1, 1])),
                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
                Arc::new(Int32Array::from(vec![1001, 1002, 1003, 1004])),
                Arc::new(Int32Array::from(vec![10011, 10021, 10031, 10051])),
            ],
        ).unwrap()
    ];

    let right = vec![
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
                Arc::new(Int32Array::from(vec![1, 2, 3, 1, 2, 3])),
                Arc::new(Int32Array::from(vec![2001, 2002, 2003, 2004, 2005, 2006])),
                Arc::new(Int32Array::from(vec![20011, 20021, 20031, 20041, 20051, 20061])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![1, 1, 1, 1, 2, 2, 2])),
                Arc::new(Int32Array::from(vec![3, 3, 4, 5, 6, 7, 8])),
                Arc::new(Int32Array::from(vec![3000, 3001, 3002, 3003, 3004, 3005, 3006])),
                Arc::new(Int32Array::from(vec![30001, 30011, 30021, 30031, 30041, 30051, 30061])),
            ],
        ).unwrap(),
    ];

    JoinFuzzTestCase::new(
        left,
        right,
        JoinType::LeftAnti,
        Some(Box::new(col_lt_col_filter)),
    )
        .run_test(&[JoinTestType::HjSmj], false)
        .await;
}

@comphead
Copy link
Contributor

well the problem is AntiJoin needs to wait for the very last right batch to read for the respective left row.

I tried couple of options how to identify the very last right batch,

self.buffered_data.scanning_offset == 0
or 
self.buffered_data.scanning_finished()

But each of them has its own false positives or false negatives. Perhaps we need a separate function or index to calculate the very last batch. @korowa do you have any other ideas on that, as you contributed a lot to SMJ, appreciate if you can help

@korowa
Copy link
Contributor

korowa commented Aug 15, 2024

From what I remember -- doesn't SMJ already fetches buffered side until it meets the first key which is non-equal to the current streamed side value (PollingRest buffered state)? It seems to be a solution for waiting the last batch for the key -- for any type (normal/semi/anti) of join, on start of streamed row processing, all required rows from the buffered side should be already read and stored in memory.

Or maybe I've mistunderstood the problem?

@comphead
Copy link
Contributor

Thanks @korowa that is something I'm also trying, I hope to make a PR soon adding you as a reviewer

@comphead
Copy link
Contributor

I think this local test may cover lots of cases

#[tokio::test]
async fn test_cross_1() {
    let left: Vec<RecordBatch> = make_staggered_batches(1);

    let left = vec![
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0, 0, 0])),
                Arc::new(Int32Array::from(vec![10, 11, 15, 20, 30, 30, 30])),
                Arc::new(Int32Array::from(vec![110, 111, 115, 120, 129, 130, 131])),
                Arc::new(Int32Array::from(vec![1100, 1110, 1150, 1200, 1290, 1300, 1310])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
                Arc::new(Int32Array::from(vec![30, 40, 50, 70])),
                Arc::new(Int32Array::from(vec![132, 140, 150, 170])),
                Arc::new(Int32Array::from(vec![1320, 1400, 1500, 1700])),
            ],
        ).unwrap()
    ];

    let right = vec![
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0])),
                Arc::new(Int32Array::from(vec![10])),
                Arc::new(Int32Array::from(vec![1100])),
                Arc::new(Int32Array::from(vec![11011])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0])),
                Arc::new(Int32Array::from(vec![20])),
                Arc::new(Int32Array::from(vec![2100])),
                Arc::new(Int32Array::from(vec![21011])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 0])),
                Arc::new(Int32Array::from(vec![30, 30, 30])),
                Arc::new(Int32Array::from(vec![3100, 3101, 3102])),
                Arc::new(Int32Array::from(vec![31001, 31011, 31021])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
                Arc::new(Int32Array::from(vec![30, 30, 30, 40])),
                Arc::new(Int32Array::from(vec![3110, 3111, 3112, 4099])),
                Arc::new(Int32Array::from(vec![31101, 31111, 31121, 40991])),
            ],
        ).unwrap(),
        RecordBatch::try_new(
            left[0].schema().clone(),
            vec![
                Arc::new(Int32Array::from(vec![0, 0])),
                Arc::new(Int32Array::from(vec![40, 49])),
                Arc::new(Int32Array::from(vec![4100, 6100])),
                Arc::new(Int32Array::from(vec![41011, 61011])),
            ],
        ).unwrap(),
    ];

    JoinFuzzTestCase::new(
        left,
        right,
        JoinType::LeftAnti,
        Some(Box::new(col_lt_col_filter)),
    )
        .run_test(&[JoinTestType::HjSmj], false)
        .await;
}

My initial thought was to do like :

  • Get all matched batches for particular streaming row
  • Increase counter when freeze_streamed called until we hit matched batches length, so presumably processed all the data.

But this approach has a flaw, namely
for given streamed row like in test above: for (0, 30) key there are 2 matches batches, but the way freeze_streamed called is not determenistic.

So for (0, 30) there are 2 batches, 3 matched rows each.
Sometimes thery are processed in 6 calls, 1 buffered row per call. sometimes 1 call for 3 rows, and then 3 calls per 1 buffered row.

@comphead
Copy link
Contributor

comphead commented Nov 5, 2024

This can be closed

@comphead comphead closed this as completed Nov 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants