Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sort-merge join #2242

Merged
merged 10 commits into from
Apr 22, 2022
Merged

Implement sort-merge join #2242

merged 10 commits into from
Apr 22, 2022

Conversation

richox
Copy link
Contributor

@richox richox commented Apr 15, 2022

Which issue does this PR close?

Closes #141.

Rationale for this change

related to #1599

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Apr 15, 2022
@alamb
Copy link
Contributor

alamb commented Apr 15, 2022

I hope to find time to review this more carefully tomorrow

@alamb
Copy link
Contributor

alamb commented Apr 15, 2022

cc @Dandandan and @tustvold

@yjshen yjshen marked this pull request as draft April 16, 2022 10:35
@yjshen yjshen marked this pull request as ready for review April 18, 2022 07:15
@yjshen yjshen changed the title Draft implementing sort-merge join Implement sort-merge join Apr 18, 2022
@alamb
Copy link
Contributor

alamb commented Apr 18, 2022

I plan to review this PR first thing tomorrow morning US eastern time (~ 6AM or so)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @richox -- quite an impressive "first PR" 🥇

I also found the code a joy to read (it was well commented and well structured)

I didn't have time to review every of the various state transitions in precise detail (this is a large PR!) or all the tests, but the ones I read made sense. Also the test case coverage is very a good start

Some follow on comments / suggestions:

  1. I think this code would be better named MergeJoin as it appears to assume the input is already sorted rather than re-sorting. It would be good to make this clear in the comments of module.
  2. I think the coverage of the various stream corner cases (RecordBatch boundaries) are not well covered. I think fuzz testing could help a lot
  3. It appears to me that this implements the SortMergeJoin operator but does not use it in any plans (yet). I wonder you can comment / link to your thoughts on how this operator will be used?

Regarding plans: I am particularly interested in dynamically switching from a HashJoin to SortMergeJoin algorithm dynamically when memory is exhausted. I have been involved in planners that get join orders / algorithm choice "wrong" due to insufficient statistics, correlated predicates, poor cost models etc, and I think dynamic behavior is the best approach to avoid such calamities.

So in conclusion, I think this PR could be merged and we can keep iterating on it when it is part of the code base; Nice work @richox

@@ -566,6 +566,7 @@ pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod sort_merge_join;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow on PR it might be nice to move all the join code into a joins directory -- like

datafusion/core/src/physical_plan/joins/sort_merge.rs
datafusion/core/src/physical_plan/joins/hash.rs
datafusion/core/src/physical_plan/joins/cross.rs

etc

right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
sort_options: Vec<SortOptions>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the usecase is for different sort_options being passed in? As in, did you consider always using some specific option like ASC NULLS FIRST for all column types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guest there may be some chance to reduce extra sorting if we support different ordering for different columns. for example:

select
    c, d
from (
    select
        a, b, c
    from table1
    order by
        a ASC,
        b DESC
    ) t1
join (
    select
        a, b, d
    from table2
    order by
        a ASC,
        b DESC
    ) t2
on t2.a = t1.a and t2.b = t1.b

in the above case, column a and b are sorted in different directions. if we support different ordering, we need no extra SortExec before joining.

};

// execute children plans
let streamed = CoalescePartitionsExec::new(streamed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clever. 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get this, why are we coalescing all partitions from streamed into a single stream? Shouldn't we do a partition-wise join?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I don't think CoalescePartitionsExec would preserve sort order, making merging with two pointers impossible.

partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let (streamed, buffered, on_streamed, on_buffered) = match self.join_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the terminology of buffered and streamed is very nice

} else {
self.num_buffered_columns
});
let (streamed_output, buffered_output) = if self.join_type != JoinType::Right {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused here because I thought JoinType::Right always swapped the streamed / buffered outputs:

https://github.com/apache/arrow-datafusion/pull/2242/files#diff-e7234e2d6a85330a8c23a2a2c2fbc73a383548ff2e48f65458e4f424b07df14eR171-R176

Why does this need to swap it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. for right-join, streamed exactly points to the right child and buffered points to the left. but the output columns are still left to right. so the references to output columns also need to be swapped here.

for ((left_array, right_array), sort_options) in
left_arrays.iter().zip(right_arrays).zip(sort_options)
{
macro_rules! compare_value {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @yjshen 's RowFormat will be able to hopefully make this kind of code faster and easier to follow

for (left_array, right_array) in left_arrays.iter().zip(right_arrays) {
macro_rules! compare_value {
($T:ty) => {{
let left_array = left_array.as_any().downcast_ref::<$T>().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future, using the arrow https://docs.rs/arrow/11.1.0/arrow/compute/kernels/comparison/fn.eq_dyn.html kernel (and then combining the bitmasks) might be faster for most joins (which are single column) rather than this custom comparison logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this joining logic, comparison is always operated between one row from streamed and several rows from buffered (mostly zero or one row in real-world data). i noticed that eq_dyn accepts two arrays and returns another boolean array. will it introduce extra costs to create and drop the array?

Column::new_with_schema("b1", &right.schema())?,
)];

let (_, batches) = join_collect(left, right, on, JoinType::Inner).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I am not sure is well covered by these tests are the various corner cases of multiple batch management -- most of the tests have only a single batch of input.

I suggest adding a fuzz type test for the SortMergeJoin in the spirit of https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/tests/order_spill_fuzz.rs

That runs the same (logical) inputs through the sort merge join but randomizes the split into record batches (rather than one RecordBatch call RecordBatch::slice() and divide it up into smaller parts

Also, since the merge join is coalescing ranges, I think using slightly larger RecordBatches with multiple join keys would be valuable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also double checking with hash join that they get the same answers would be good

let (_, batches) = join_collect(left, right, on, JoinType::Inner).await?;

let expected = vec![
"+----+----+----+----+----+----+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checked with postgres:

alamb=# select * from l JOIN r ON (l.b1 = r.b1);
 a1 | b1 | c1 | a2 | b1 | c2 
----+----+----+----+----+----
  1 |  4 |  7 | 10 |  4 | 70
  2 |  5 |  8 | 20 |  5 | 80
  3 |  5 |  9 | 20 |  5 | 80
(3 rows)

👍

Add detailed comments of the ordering requirements of two input children.

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
};

// execute children plans
let streamed = CoalescePartitionsExec::new(streamed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get this, why are we coalescing all partitions from streamed into a single stream? Shouldn't we do a partition-wise join?

};

// execute children plans
let streamed = CoalescePartitionsExec::new(streamed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I don't think CoalescePartitionsExec would preserve sort order, making merging with two pointers impossible.

}
}

/// Metrics for SortMergeJoinExec (Not yet implemented)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of date doc?

pub streamed_idx: usize,
/// Currrent buffered data
pub buffered_data: BufferedData,
/// (used in outer join) Is current streamed row joined at least once?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

on_streamed: Vec<Column>,
on_buffered: Vec<Column>,
join_type: JoinType,
output_buffer: Vec<Box<dyn ArrayBuilder>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to reuse MutableRecordBatch in follow-up PRs?

cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.join_metrics.join_time.timer();
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice loop with state transition 👍

};
}
SMJState::Polling => {
if ![StreamedState::Exhausted, StreamedState::Ready]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matches! macro maybe?

.iter()
.zip(batch.schema().fields())
.enumerate()
.try_for_each(|(i, (column, f))| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_for_each result not handled?

fn append_row_to_output(
batch: &RecordBatch,
idx: usize,
arrays: &mut [Box<dyn ArrayBuilder>],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we should generalize MutableRecordBatch with many common usage patterns.

@yjshen
Copy link
Member

yjshen commented Apr 19, 2022

I've just finished my first pass of review, and the overall structure looks great to me. Nice work @richox!
I plan to re-review the polling logic carefully tomorrow morning.

@yjshen
Copy link
Member

yjshen commented Apr 19, 2022

Cc @Dandandan you might be interested in this as well.

@yjshen yjshen requested a review from Dandandan April 19, 2022 14:59
Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @richox. The work is solid and looks great to me!

As @alamb pointed out before, there are several follow-ups such as SMJ rename, join folder re-org, hash-join & merge-join consolidation, etc. We could do these in later PRs.

Thanks again for this epic work!

@yjshen yjshen merged commit 8867353 into apache:master Apr 22, 2022
@alamb
Copy link
Contributor

alamb commented Apr 22, 2022

I agree -- thank you so much @richox 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Sort-Merge Join
3 participants