Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SMJ join filtered part out of join_output stage. LeftOuter, LeftSemi #12764

Merged
merged 8 commits into from
Oct 18, 2024

Conversation

comphead
Copy link
Contributor

@comphead comphead commented Oct 4, 2024

Which issue does this PR close?

Related to #11555
Related to #12359

Closes #.

Rationale for this change

Move filtered logic out of join_output stage.

The problem is for filtered joins there is extra step need, e.g to calculate final filtered mask for the specific row the algorithm, it needs the knowledge for every right row processed for the given left row.

For example for LeftOuter join :
select * from t1 join t2 on (t1.a = t2.a and t1.b > t2.b)

t1:

a b
1 10

t2:

a b
1 9
1 11

Currently the Join will output

a b a b
1 10
1 10 1 11

which is incorrect, the first null matched row shouldn't be out as for given row there is a match exists.

The problem is records calculated depending on batch size and the output can be called anytime once the output size hit the batch size. So with batch size == 1 in this scenario the first row will be out because output == 1 which is equal to batch_size but the algorithm on this stage have no idea that another matched row is coming.

The idea of algorithm is to move filtered algorithm out of join stage and be not dependent on batch size. Instead it will be called once left row index switched to next index, to be sure every right row is processed for given left row, thus it will be still in batches although not strictly equal to batch_size.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Oct 4, 2024
@comphead
Copy link
Contributor Author

comphead commented Oct 4, 2024

@korowa @viirya Please have a look on prototype with just Left Outer join moved out of join output stage. Let me know your thoughts. With this approach I hope we can handle all other filtered cases like right, semi, anti

@comphead
Copy link
Contributor Author

comphead commented Oct 4, 2024

@alamb cc

pub streamed_batch_counter: AtomicUsize,
}

struct JoinedRecordBatches {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need more information to track the filtered bitmask across incoming batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please comment what the fields represent -- In particular how is filter_mask, row_indices and batch_ids interpreted relative to the batches (do they always have the same row count? What does the batch represent?

seen_true = true;
corrected_mask.append_value(true);
} else if seen_true || !filter_mask.value(i) && !last_index_for_row {
corrected_mask.append_null(); // to be ignored and not set to output
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NULL is the row shouldnot go to the output

} else if seen_true || !filter_mask.value(i) && !last_index_for_row {
corrected_mask.append_null(); // to be ignored and not set to output
} else {
corrected_mask.append_value(false); // to be converted to null joined row
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

false means for null joined rows

@@ -733,7 +786,23 @@ impl Stream for SMJStream {
match self.current_ordering {
Ordering::Less | Ordering::Equal => {
if !streamed_exhausted {
if self.join_type == JoinType::Left
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ifs is part of WIP. if the approach is okay we can move all join types under it and the only if stmt will be if the join is filtered or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filtering phase happens when the left row index gets changed to ensure all right rows processed for the given row

{
record_batch
} else {
RecordBatch::new_empty(Arc::clone(&self.schema))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block is magic, but without it the join stucks, I believe it is something with output sizes. We can improve it later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could change it to continue to avoid outputting an empty batch 🤔

self.freeze_streamed()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
self.streamed_batch =
StreamedBatch::new(batch, &self.on_streamed);
self.streamed_batch_counter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to track batch ids, to prevent edge case like
Batch 0 value 1 streamed index 0
Batch 1 value 2 streamed index 0

So the streamed index is the same, and without batch id its not possible to figure out the actual values are different

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please put that context as comments of the docs of stream_batch_counter?

@@ -1330,10 +1433,10 @@ impl SMJStream {

let columns = if matches!(self.join_type, JoinType::Right) {
buffered_columns.extend(streamed_columns.clone());
buffered_columns
buffered_columns.clone()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clones will be removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still plan to remove the clones?

if self.join_type == JoinType::Left {
self.output_record_batches
.batches
.push(output_batch.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Left WIP we pass the original non filtered batch because filtering will be done later.

compute::filter_record_batch(&output_batch, mask)?;
self.output_record_batches.batches.push(filtered_batch);
}
self.output_record_batches.filter_mask.extend(mask);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collect all the neccessary extra information across batches

.0;
if matches!(self.join_type, JoinType::Right | JoinType::Full) {
// The reverse of the selection mask. For the rows not pass join filter above,
// we need to join them (left or right) with null rows for outer joins.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not needed it doesn't have left join, hopefully we can remove it

@@ -1520,9 +1639,79 @@ impl SMJStream {
} else {
self.output_size -= record_batch.num_rows();
}
self.output_record_batches.clear();
if self.filter.is_none() || self.join_type != JoinType::Left {
self.output_record_batches.batches.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for filtered joins we dont need to clear batches as we need them later, I'll do it rhough the flag later

Ok(record_batch)
}

fn filter_joined_batch(&mut self) -> Result<RecordBatch> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the main logic: But in fact what it does it just consolidate batches, filtering information and does the filtering in very similar way we currently do in freeze_streamed

}

#[tokio::test]
async fn test1() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be removed

@@ -134,8 +134,103 @@ async fn test_left_join_1k_filtered() {
JoinType::Left,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::NljHj], false)
.run_test(&[JoinTestType::HjSmj], false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test passes now

@korowa
Copy link
Contributor

korowa commented Oct 4, 2024

@comphead, I also was checking for these two issues, and found that currently SMJ contains some utility logic used in other join operators, but reimplements it in its own way. In addition the data processing could be implemented in more similar to other joins fashion -- currently it's kind of hard to track what's going on there.

I'm currently trying to rework it here, and I've got fuzz tests passing (but there are other issues), but it still needs some cleanups, proper working with spills and comments/docs.

@comphead
Copy link
Contributor Author

comphead commented Oct 4, 2024

@comphead, I also was checking for these two issues, and found that currently SMJ contains some utility logic used in other join operators, but reimplements it in its own way. In addition the data processing could be implemented in more similar to other joins fashion -- currently it's kind of hard to track what's going on there.

I'm currently trying to rework it here, and I've got fuzz tests passing (but there are other issues), but it still needs some cleanups, proper working with spills and comments/docs.

Thanks @korowa the way how SMJ implemented is truly hard to understand, it took me a couple of painful weeks to dig through it.

I feel you propose to go into 2 directions?
To unblock some projects that already relies on DF SMJ , we can fix the current SMJ Leftouter and other filtered variants with approach above or simialer and in parallel have your new implementation reviewed and onboarded?

@korowa
Copy link
Contributor

korowa commented Oct 6, 2024

I feel you propose to go into 2 directions?

@comphead sure, we can, they are not mutually exclusive and, likely, current approach will be completed and delivered faster, as it changes less things. I just wanted to say that (ideally, if possible) this fix could potentially be achieved by already reusing utility functions from other join implementations (like adjust_indices_by_join_type).

Regarding this fix -- it seems now there are two places doing almost the same thing (please correct me if I'm wrong)

  1. filtering batches and correcting filter mask while staging output batches (applying filter and get_filtered_join_mask in freeze_all)
  2. filtering batches and correcting mask while producing output batches (filter_joined_batch which calls get_corrected_filter_mask)

Also, get_filtered_join_mask is, likely, intended to do exactly the same thing as get_corrected_filter_mask due to its comment line:

/// This return a tuple of:
/// - corrected mask with respect to the join type

So, maybe there is a chance, that current implementation of filtering and aligning output indices according to the join type logic can be located in a single place for all join types, using the same functions, rather than being spreaded across two processing stages?

@comphead
Copy link
Contributor Author

comphead commented Oct 6, 2024

Thanks @korowa for the feedback, this is not a final PR, the goal was to discuss the direction, having LeftOuter filtered join as an example.

If we okay with the example I'm planning to move all the filtered logic into new single place and make the code cleaner.

Thanks for adjust_indices_by_join_type hint, I'll check if we can reuse it after getting the code restructured like mentioned above, most likely in separate PR to make things manageable.

@comphead comphead changed the title WIP: move filtered join out of join_output stage. LeftOuter experiment WIP: move SMJ join filtered part out of join_output stage. LeftOuter experiment Oct 7, 2024
@comphead
Copy link
Contributor Author

comphead commented Oct 8, 2024

@korowa I'm planning to move all other join variants to the same approach so the filtered logic will be in a single place, test it out and make the PR ready for your review

@@ -229,6 +227,7 @@ async fn test_anti_join_1k() {
#[tokio::test]
// flaky for HjSmj case, giving 1 rows difference sometimes
// https://github.com/apache/datafusion/issues/11555
#[ignore]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be enabled when Left Anti has moved out of join partial as well

@comphead comphead changed the title WIP: move SMJ join filtered part out of join_output stage. LeftOuter experiment Move SMJ join filtered part out of join_output stage. LeftOuter, LeftSemi Oct 16, 2024
@comphead
Copy link
Contributor Author

comphead commented Oct 16, 2024

@korowa @viirya @alamb can I get the review please on this on the final PR moving the filtering SMJ logic out of join partial phase. Its done for Left/LeftSemi/Inner Join.

Right Joins, Full, Anti I'm planning to move right after this PR has approved

Please do not put much attention on disabled tests, as before the filtered join was unstable anyway

@alamb
Copy link
Contributor

alamb commented Oct 16, 2024

I will plan to review this either later today or tomorrow

if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left | JoinType::LeftSemi
Copy link
Contributor Author

@comphead comphead Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is multiple if statements of this kind just because we moved only 2 types of join

@comphead
Copy link
Contributor Author

I will plan to review this either later today or tomorrow

Thanks @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @comphead -- I think this seems like a nice improcement to me. I left some stylistic comments that I think would help encapsulation, etc

My only real concern about this PR is the commented out tests. I noticed that you say

Please do not put much attention on disabled tests, as before the filtered join was unstable anyway

I don't understand what this means. Do you mean that while the tests passed, the actual output of SortMergeJoin was likely not correct and thus the code with this change will not be any less correct.

I think once that is sorted out this PR would be good to go

@@ -100,13 +100,14 @@ Alice 100 Alice 2
Alice 50 Alice 1
Alice 50 Alice 2

# Uncomment when filtered RIGHT moved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that if we merge this PR the answers are incorrect (aka that we will be introducing a regression for some period of time?)

Copy link
Contributor Author

@comphead comphead Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking to risks, SMJ is experimental and disabled by default. Even now each filtered SMJ variant has a correctness issue depending on data distribution. Despite the fact that simple tests passed, the fuzz tests are commented out for filtered variants in main for the reason above .

I understand the concern, but moving all variants to new approach will make the PR unmanageable to review. I'll try to move other variants as fast as possible, moreover the most wide used variants(Inner, Left, LeftSemi) are moved in this PR

Copy link
Contributor Author

@comphead comphead Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues #11555
#12359 focus on SMJ correctness issues. Fuzz tests flaky because filtered SMJ has a problem occurring spontaneously and depending on incoming data distribution. Answering your question, yes SMJ has issues and this PR is to make SMJ to work correctly and fix current bugs

pub streamed_batch_counter: AtomicUsize,
}

struct JoinedRecordBatches {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please comment what the fields represent -- In particular how is filter_mask, row_indices and batch_ids interpreted relative to the batches (do they always have the same row count? What does the batch represent?

self.freeze_streamed()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
self.streamed_batch =
StreamedBatch::new(batch, &self.on_streamed);
self.streamed_batch_counter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please put that context as comments of the docs of stream_batch_counter?

@@ -1330,10 +1433,10 @@ impl SMJStream {

let columns = if matches!(self.join_type, JoinType::Right) {
buffered_columns.extend(streamed_columns.clone());
buffered_columns
buffered_columns.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still plan to remove the clones?

{
self.freeze_all()?;

if !self.output_record_batches.batches.is_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if adding some methods on JoinedRecordBatches would make this code easier to read - For example instead of directly accessing the fields, perhaps adding functions would allow

Suggested change
if !self.output_record_batches.batches.is_empty()
if !self.output_record_batches.is_empty()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That totally makes sense, I feel it requires more methods to add(add/is_empty/clear), I'd prefer to do it as follow up, this PR is too large imho

{
record_batch
} else {
RecordBatch::new_empty(Arc::clone(&self.schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could change it to continue to avoid outputting an empty batch 🤔

@comphead
Copy link
Contributor Author

comphead commented Oct 17, 2024

Thanks @alamb for the feedback I'll address the comments asap

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an improvement to me - thank you @comphead

Let's get this in and keep iterating

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants