Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SMJ Left Anti Join when the join filter is set #10724

Merged
merged 3 commits into from
May 31, 2024
Merged

Conversation

comphead
Copy link
Contributor

Which issue does this PR close?

Closes #10380.

Rationale for this change

Fix issues when LeftAnti Join returned nothing when the join filter was set

What changes are included in this PR?

Handled scenarios for LeftAntiJoin when join filter is set. Output the streaming row(left join side row) only if there is no corresponding filtered match from buffered data(right join side)

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label May 30, 2024
@comphead comphead requested review from viirya and alamb May 30, 2024 15:52
@comphead comphead changed the title Fix Left Anti Join when the join filter is set Fix SMJ Left Anti Join when the join filter is set May 30, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @comphead -- this looks like an improvement to me. Queries now get correct answers that did not before.

I remember that Anti join semantics are quite subtle, and I am not sure I would catch subtle bugs in my review, but it seems to me that this code is an improvement to what is on main

nice work

@@ -1181,7 +1189,10 @@ impl SMJStream {
let filter_columns = if chunk.buffered_batch_idx.is_some() {
if matches!(self.join_type, JoinType::Right) {
get_filter_column(&self.filter, &buffered_columns, &streamed_columns)
} else if matches!(self.join_type, JoinType::LeftSemi) {
} else if matches!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we have to check (in the lines above) for RightAnti and RightSemi? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a separate ticket for Right* joins, however I'm not sure how to build a right join on the sql level tbh. I will check it once we have Left* stabilized

@@ -1397,6 +1416,9 @@ fn get_filter_column(
.map(|i| buffered_columns[i.index].clone())
.collect::<Vec<_>>();

// dbg!(&left_columns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks left over and could probably be removed

JoinType::LeftAnti => {
// have we seen a filter match for a streaming index before
for i in 0..streamed_indices_length {
if mask.value(i) && !seen_as_true {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember there are subtle semantics with respect to NULLs and Anti joins. It looks like this doe the right thing (looks for true values in the mask). However i wonder if you also should check if mask.valid() as well first ?

This may not be necessary if mask is always non nullable. I am not sure

Copy link
Contributor Author

@comphead comphead May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a confusion between 0 and nulls, good point, but not sure if its applicanble for booleans, I'll check it

UPD: there is a collision between nulls and default values. so for boolean null value will be fetched as false which is okay for this logic as we seeking for the true, and null index cannot be fetched as true

@comphead
Copy link
Contributor Author

Thank you @comphead -- this looks like an improvement to me. Queries now get correct answers that did not before.

I remember that Anti join semantics are quite subtle, and I am not sure I would catch subtle bugs in my review, but it seems to me that this code is an improvement to what is on main

nice work

Thanks @alamb I have added the basic left anti join test cases to slt file. However once we resolve the #10659 it gonna be easier to find issues then we cannot imagine right away

@alamb alamb merged commit d6ddd23 into apache:main May 31, 2024
23 checks passed
@alamb
Copy link
Contributor

alamb commented May 31, 2024

Let's keep iterating on this code in subsequent PRs

Comment on lines +1492 to +1494
// LeftAnti semantics: return true if for every x in the collection, p(x) is false.
// the true(if any) flag needs to be set only once per streaming index
// to prevent duplicates in the output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear what p(x) is as there is no such variable. You can say "join filter" instead of a obscure term.

Some((corrected_mask.finish(), filter_matched_indices))
}
// LeftAnti semantics: return true if for every x in the collection, p(x) is false.
// the true(if any) flag needs to be set only once per streaming index
Copy link
Member

@viirya viirya May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the true(if any) flag" -> "The corrected_mask values for same streaming index". We should keep the comment as direct and clear as possible.

@comphead
Copy link
Contributor Author

Thanks @alamb and @viirya I'll address all of the comments in followup PR

Comment on lines +1506 to +1507
|| (i == streamed_indices_length - 1
&& *scanning_buffered_batch_idx == buffered_batches_len - 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second condition is not in the above comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the second condition is for the last index in streamed_indices and last buffered batch. But in last buffered batch, there are many buffered indices. It is possible we are in the last buffered batch, but we still have more buffered indices to process. Isn't this put a value into corrected_mask too early so it could be incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the second condition is for the last index in streamed_indices and last buffered batch. But in last buffered batch, there are many buffered indices. It is possible we are in the last buffered batch, but we still have more buffered indices to process. Isn't this put a value into corrected_mask too early so it could be incorrect?

Good point, I'll check that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sort Merge Join. LeftAnti issues
3 participants