Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emitting partial join results in HashJoinStream #8020

Merged
merged 20 commits into from
Jan 22, 2024

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Nov 1, 2023

Which issue does this PR close?

Closes #7848.

Rationale for this change

Ability to produce intermediate output, without accumulating "inner" batch until complete probe-side batch is joined, will help to avoid unpredictable memory consumption by HashJoinStream (e.g in case of implicit cross-joins) and to process such queries.

What changes are included in this PR?

  • JoinHashMap::get_matched_indices_with_limit_offset -- method that allows to get limited amount of matched pairs from JoinHashMap along with starting point for next call of this method.
  • additional attributes in ProcessProbeBatchState required for tracking the progress of current batch processing -- the state itself mutated once per iteration.
  • build_equal_condition_join_indices is splitted into lookup_join_hashmap (matching & verification of values behind hashes are equal) and apply_join_filter_to_indices (filtering logic -- not a new function). Both HashJoinExec and SymmetricHashJoinExec have their own implementation of matching -- at this point, the difference between them seems to start being noticeable.
  • adjust_indices_by_join_type and subsequently called by it functions now accept Range argument in order to perform partial adjustment.

Are these changes tested?

Tests for each type of join (both with and without filters) now run for multiple batch sizes + added tests for validation output/batch count for each join type and varying batch_size.
Additionally these modifications are covered by join_fuzz tests.

Are there any user-facing changes?

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Nov 1, 2023
@korowa korowa marked this pull request as draft November 1, 2023 19:04
@metesynnada
Copy link
Contributor

I will review it soon, can you share the performance changes with https://github.com/apache/arrow-datafusion/tree/main/benchmarks

@korowa
Copy link
Contributor Author

korowa commented Nov 2, 2023

@metesynnada, thanks! Results for tpch_mem (10 iterations, scale=1) and tpch (5 iterations, scale=10) are

--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃     master ┃ hash_join_batch_size ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  5415.03ms │            5425.11ms │    no change │
│ QQuery 2     │   857.37ms │             930.79ms │ 1.09x slower │
│ QQuery 3     │  2550.69ms │            2668.79ms │    no change │
│ QQuery 4     │  1329.48ms │            1394.45ms │    no change │
│ QQuery 5     │  3670.07ms │            3799.92ms │    no change │
│ QQuery 6     │  1226.23ms │            1232.87ms │    no change │
│ QQuery 7     │  7879.84ms │            8178.27ms │    no change │
│ QQuery 8     │  3723.55ms │            3748.62ms │    no change │
│ QQuery 9     │  6454.01ms │            6679.66ms │    no change │
│ QQuery 10    │  3747.42ms │            3792.84ms │    no change │
│ QQuery 11    │  1061.44ms │            1118.66ms │ 1.05x slower │
│ QQuery 12    │  1876.41ms │            1908.35ms │    no change │
│ QQuery 13    │  3787.24ms │            3937.90ms │    no change │
│ QQuery 14    │  1627.99ms │            1635.65ms │    no change │
│ QQuery 15    │  1648.28ms │            1655.15ms │    no change │
│ QQuery 16    │   931.55ms │             942.41ms │    no change │
│ QQuery 17    │ 14161.80ms │           14253.68ms │    no change │
│ QQuery 18    │ 21711.26ms │           23960.37ms │ 1.10x slower │
│ QQuery 19    │  3027.67ms │            3069.92ms │    no change │
│ QQuery 20    │  3825.12ms │            3925.90ms │    no change │
│ QQuery 21    │  9527.87ms │           10463.53ms │ 1.10x slower │
│ QQuery 22    │  1010.40ms │            1043.16ms │    no change │
└──────────────┴────────────┴──────────────────────┴──────────────┘
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃   master ┃ hash_join_batch_size ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 393.17ms │             392.63ms │     no change │
│ QQuery 2     │  94.37ms │              99.65ms │  1.06x slower │
│ QQuery 3     │ 100.94ms │             105.51ms │     no change │
│ QQuery 4     │  69.70ms │              70.29ms │     no change │
│ QQuery 5     │ 227.66ms │             231.48ms │     no change │
│ QQuery 6     │  32.18ms │              30.46ms │ +1.06x faster │
│ QQuery 7     │ 461.49ms │             491.93ms │  1.07x slower │
│ QQuery 8     │ 159.03ms │             163.71ms │     no change │
│ QQuery 9     │ 337.22ms │             360.54ms │  1.07x slower │
│ QQuery 10    │ 190.43ms │             195.71ms │     no change │
│ QQuery 11    │ 111.48ms │             117.54ms │  1.05x slower │
│ QQuery 12    │  87.90ms │              88.86ms │     no change │
│ QQuery 13    │ 179.83ms │             187.62ms │     no change │
│ QQuery 14    │  35.45ms │              29.73ms │ +1.19x faster │
│ QQuery 15    │  35.02ms │              39.34ms │  1.12x slower │
│ QQuery 16    │  82.82ms │              89.14ms │  1.08x slower │
│ QQuery 17    │ 616.74ms │             583.82ms │ +1.06x faster │
│ QQuery 18    │ 723.74ms │             813.82ms │  1.12x slower │
│ QQuery 19    │  98.08ms │              97.18ms │     no change │
│ QQuery 20    │ 192.25ms │             192.46ms │     no change │
│ QQuery 21    │ 576.40ms │             609.72ms │  1.06x slower │
│ QQuery 22    │  59.03ms │              58.96ms │     no change │
└──────────────┴──────────┴──────────────────────┴───────────────┘

I guess before marking this PR as "ready for review" I'll add some unit tests for hash join and investigate reasons of slowing down q18 and q21

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa and @metesynnada

I am also running the performance benchmarks on a GCP machine and I will post the results here when they are ready

I wonder if it would be possible to encapsulate some of these functions in a struct, something like

struct JoinOutputBuilder {
 ...
}

That might allow a more efficient encoding of the current join state rather than having to pass around Range<usize> perhaps 🤔

0 0 3 0
0 0 6 0
0 0 20 0
1 3 95 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this output change? is it because the order on t2.a is a tie?

Copy link
Contributor Author

@korowa korowa Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes -- this query result ordered by only t2 with random order on t1, and current behaviour for indices-mathching function is to iterate over inverted probe-side indices and attach build-side indices to them (HashMap + Vector data structure for also emits build-side indices in reverse order), and after matching whole probe-side side, resulting arrays inverted again -- it allows to return right-left side indices in their natural order.

In case of partial output -- I can't see any other option besides iterating probe-side naturally (otherwise the order of record would be broken as there is no "full batch" anymore to re-sort it), but in the same time build-side is stored in same data structure with reverse order.

So, it's a side effect -- hash join still maintains probe-side input order, but not for build-side anymore (guess it can potentially be achieved by tweaking collect_build_side function) -- that's why t1 order in this query result is inverted now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the build side order is important -- maybe we should just update the test so it has a well defined deterministic output 🤔

Copy link
Contributor

@metesynnada metesynnada Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe preserving the order of the build side is important for further aggregations. We can avoid unnecessary sorting by leveraging the build-side lexical order. This pattern is common in Datafusion and warrants investing in order preservation.

What is the cost of maintaining the build side order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that build-side order is not that important (at least for majority of cases -- anyway it will be affected by probe-side ordering), but on the other side it's reasonable not to break current behaviour.

At first glance, the cost is increase of join_build_time caused by switching current JoinHashMap building process from LIFO (last value stored in map) to FIFO (first value stored in map) -- it'll probably require more traversals over JoinHashMap.next to place all the elements, and affect joins with high share of non-unique values on build side.

Copy link
Contributor Author

@korowa korowa Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metesynnada, my bad, didn't notice those special cases in calculate_join_output_ordering for inner joins.

I guess, I'll make some research on this one and ideas related to it.

Thank you!

@korowa
Copy link
Contributor Author

korowa commented Nov 3, 2023

Thank you @alamb , but I don't think this PR worth benchmarking it -- after running current unit tests with different batch sizes, I've found that arguments for adjust_indices_by_join_type not really correct in this version + it works incorrect with join filters, so I suppose I need a bit more time to figure it out 😞
And yes, in this context, your suggestion about placing state management logic into separate JoinOutputBuilder struct is definitely worthwhile -- I'll look for an options of separating state from other join internals.

@alamb
Copy link
Contributor

alamb commented Nov 4, 2023

definitely worthwhile -- I'll look for an options of separating state from other join internals.

Thank you .

this particular issue has become a higher priority for us at InfluxData (because we are sometimes hitting it internally). I may spend some time working on it myself this upcoming week (e.g. maybe encapsulating the join output generation into a struct as a first step 🤔 )

@korowa korowa force-pushed the hash_join_batch_size branch 2 times, most recently from cba8b0f to 447180d Compare November 4, 2023 15:04
@korowa
Copy link
Contributor Author

korowa commented Nov 4, 2023

@alamb, got it!

Anyway, current version works as expected, so I'll mark it as ready for review. Regarding logic / encapsulation -- I've moved all the index-tracking work to HashJoinStreamState, which provides methods to update it, and receive state for next iteration / range that should be adjusted (still not an output builder though, but as a good thing -- it doesn't affect current processing significantly).

@korowa korowa marked this pull request as ready for review November 4, 2023 16:11
@korowa korowa changed the title WIP: feat: emitting partial join results in HashJoinStream feat: emitting partial join results in HashJoinStream Nov 4, 2023
@alamb
Copy link
Contributor

alamb commented Nov 6, 2023

Thanks @korowa -- I'll review this PR later today.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa . I reviewed the code and tests carefully and it is looking really nice to me.

I left some style suggestions, but I don't think anything is required prior to merge. However, given I am not as familiar with the join code, it would be awesome if either @liukun4515 @mingmwang or @Dandandan could also give this PR a review to ensure we aren't missing something

I am running the tpch benchmark tests now to confirm it doesn't represent any regression in performance

I am also testing this against our internal reproducer to see if I can verify if this PR fixes the issue we were seeing

Again, really nice work

0 0 3 0
0 0 6 0
0 0 20 0
1 3 95 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the build side order is important -- maybe we should just update the test so it has a well defined deterministic output 🤔


#[template]
#[rstest]
fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice way to test the logic

@@ -747,6 +753,97 @@ where
Ok(())
}

// State for storing left/right side indices used for partial batch output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as a follow on PR we can move this structure into its own module (perhaps something like datafusion/physical-plan/src/joins/join_output.rs)

datafusion/physical-plan/src/joins/hash_join.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/joins/hash_join.rs Outdated Show resolved Hide resolved
(0..row_count)
.filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32))
(range)
.filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as u32))
.collect::<UInt32Array>()
}

/// Get unmatched and deduplicated indices
pub(crate) fn get_anti_u64_indices(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_anti_u64_indices is always called with a range of 0.. -- what is the reason for changing it to take a Range<usize>?

Copy link
Contributor Author

@korowa korowa Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial intention was to keep function signatures uniform for get_anti_indices and get_anti_u64_indices, but (since they duplicate each other) now I've made get_anti_indices a generic function -- so there is no need in u64-variation anymore.

(0..row_count)
.filter_map(|idx| (bitmap.get_bit(idx)).then_some(idx as u32))
(range)
.filter_map(|idx| (bitmap.get_bit(idx - offset)).then_some(idx as u32))
.collect::<UInt32Array>()
}

/// Get matched and deduplicated indices
pub(crate) fn get_semi_u64_indices(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, get_semi_u64_indices never seems to be called with a range that doesn't start with zero

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

datafusion/physical-plan/src/joins/hash_join.rs Outdated Show resolved Hide resolved
matched_indices_updated: bool,
// tracking last joined probe side index seen for further indices adjustment
last_joined_probe_index: Option<usize>,
// tracking last joined probe side index seen for further indices adjustment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is same as above -- can you clarify what the difference is between "last" and "prev"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has been updated

prev_joined_probe_index: Option<usize>,
}

impl HashJoinStreamState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to represent a state machine like this in Rust is via an enum. The upside is that it makes it more explicit what fields are used in what states and what transitions are allowed

Something like this perhaps

enum HashJoinOutputState {
  /// The hash table is still being built
  Hashing,
  /// The hash table has been built, and a batch of `probe_rows` has is being output
  /// but nothing has been output yet
  Begin {
    probe_rows: usize
  },
  /// Have output up to `last_matched_indices`
  Output {
    // saved probe-build indices to resume matching from
    last_matched_indices: Option<(usize, usize)>,
  } 
 /// ....
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly advise to make this enum, this is a common approach in other executors as well.

@alamb
Copy link
Contributor

alamb commented Nov 6, 2023

Here are my benchmark results. My conclusion is that there is no significant performance change (the faster/slower is within the realm of noise). I will try running at a larger scale to see if anything shows up

--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ hash_join_batch_size ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  928.44ms │             930.39ms │     no change │
│ QQuery 2     │  210.54ms │             222.96ms │  1.06x slower │
│ QQuery 3     │  454.08ms │             463.21ms │     no change │
│ QQuery 4     │  248.99ms │             280.77ms │  1.13x slower │
│ QQuery 5     │  589.44ms │             581.81ms │     no change │
│ QQuery 6     │  219.45ms │             223.18ms │     no change │
│ QQuery 7     │  925.05ms │            1063.70ms │  1.15x slower │
│ QQuery 8     │  675.09ms │             717.43ms │  1.06x slower │
│ QQuery 9     │ 1052.59ms │            1050.42ms │     no change │
│ QQuery 10    │  712.75ms │             688.61ms │     no change │
│ QQuery 11    │  212.84ms │             200.64ms │ +1.06x faster │
│ QQuery 12    │  362.82ms │             359.72ms │     no change │
│ QQuery 13    │  560.56ms │             580.54ms │     no change │
│ QQuery 14    │  322.74ms │             305.90ms │ +1.06x faster │
│ QQuery 15    │  232.62ms │             235.21ms │     no change │
│ QQuery 16    │  184.60ms │             188.35ms │     no change │
│ QQuery 17    │ 1218.95ms │            1166.47ms │     no change │
│ QQuery 18    │ 1741.70ms │            1742.98ms │     no change │
│ QQuery 19    │  566.74ms │             551.15ms │     no change │
│ QQuery 20    │  482.46ms │             456.34ms │ +1.06x faster │
│ QQuery 21    │ 1388.65ms │            1408.79ms │     no change │
│ QQuery 22    │  171.23ms │             165.01ms │     no change │
└──────────────┴───────────┴──────────────────────┴───────────────┘
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ hash_join_batch_size ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  588.55ms │             574.48ms │     no change │
│ QQuery 2     │  154.73ms │             163.05ms │  1.05x slower │
│ QQuery 3     │  152.65ms │             154.83ms │     no change │
│ QQuery 4     │  109.42ms │             110.63ms │     no change │
│ QQuery 5     │  379.86ms │             391.93ms │     no change │
│ QQuery 6     │   39.26ms │              39.47ms │     no change │
│ QQuery 7     │  885.93ms │             898.37ms │     no change │
│ QQuery 8     │  277.20ms │             279.22ms │     no change │
│ QQuery 9     │  565.02ms │             548.77ms │     no change │
│ QQuery 10    │  311.39ms │             304.02ms │     no change │
│ QQuery 11    │  172.35ms │             172.79ms │     no change │
│ QQuery 12    │  144.86ms │             152.07ms │     no change │
│ QQuery 13    │  241.93ms │             253.64ms │     no change │
│ QQuery 14    │   48.83ms │              45.23ms │ +1.08x faster │
│ QQuery 15    │   45.07ms │              46.21ms │     no change │
│ QQuery 16    │  150.89ms │             160.37ms │  1.06x slower │
│ QQuery 17    │  751.06ms │             763.63ms │     no change │
│ QQuery 18    │ 1349.89ms │            1498.83ms │  1.11x slower │
│ QQuery 19    │  144.74ms │             150.76ms │     no change │
│ QQuery 20    │  305.43ms │             312.09ms │     no change │
│ QQuery 21    │ 1026.75ms │            1060.09ms │     no change │
│ QQuery 22    │   90.80ms │              87.23ms │     no change │
└──────────────┴───────────┴──────────────────────┴───────────────┘

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I have verified that this change fixes out problem upstream (memory no longer grows without bound).

Thank you again @korowa

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there is room for improvement in the code. One suggestion is to handle state handling and output-producing mechanisms in common structs, which could make it easier to understand. Another idea is the explanatory ASCII diagram for future maintainers. Additionally, for SymmetricHashJoin, it is recommended not to use HashJoinStreamState inside the SHJ file. Instead, it would be cleaner to copy the previous code into SHJ and keep them separate.

0 0 3 0
0 0 6 0
0 0 20 0
1 3 95 0
Copy link
Contributor

@metesynnada metesynnada Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe preserving the order of the build side is important for further aggregations. We can avoid unnecessary sorting by leveraging the build-side lexical order. This pattern is common in Datafusion and warrants investing in order preservation.

What is the cost of maintaining the build side order?

prev_joined_probe_index: Option<usize>,
}

impl HashJoinStreamState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly advise to make this enum, this is a common approach in other executors as well.

@korowa
Copy link
Contributor Author

korowa commented Nov 7, 2023

@alamb @metesynnada thank you!

I've addressed some comments, and, just to sum up, remaining ones (major ones) I'm going to work on:

  • completely retain SHJ logic -- been considering this option, but code duplication confused me a bit. However, if it's not a problem -- I'll revert all the changes regarding SHJ
  • maintaining original build side sort order (order by probe.key, build.key -- as it was before) -- going to check out an approach with modifying build-side collection & updating JoinHashMap
  • enum / more straightforward / readable state management -- going to look for more neat implementation (thanks for guidance!)

@korowa korowa marked this pull request as ready for review January 14, 2024 16:57
@korowa
Copy link
Contributor Author

korowa commented Jan 14, 2024

cc @alamb @metesynnada

I suppose this version to be ready for review. Could you, please, take a look when you have some time?

Just in case -- tpch_mem results for this version are

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃   master ┃ hash_join_batch_size ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 219.32ms │             222.32ms │    no change │
│ QQuery 2     │  41.97ms │              42.19ms │    no change │
│ QQuery 3     │  98.34ms │              96.49ms │    no change │
│ QQuery 4     │  91.24ms │              93.54ms │    no change │
│ QQuery 5     │ 158.88ms │             159.20ms │    no change │
│ QQuery 6     │  25.88ms │              26.33ms │    no change │
│ QQuery 7     │ 349.07ms │             373.73ms │ 1.07x slower │
│ QQuery 8     │  69.36ms │              72.65ms │    no change │
│ QQuery 9     │ 158.26ms │             159.75ms │    no change │
│ QQuery 10    │ 160.81ms │             164.02ms │    no change │
│ QQuery 11    │  48.31ms │              47.87ms │    no change │
│ QQuery 12    │  73.05ms │              72.87ms │    no change │
│ QQuery 13    │ 113.89ms │             118.59ms │    no change │
│ QQuery 14    │  27.29ms │              30.08ms │ 1.10x slower │
│ QQuery 15    │  45.36ms │              46.13ms │    no change │
│ QQuery 16    │  55.31ms │              55.95ms │    no change │
│ QQuery 17    │ 156.56ms │             162.25ms │    no change │
│ QQuery 18    │ 476.42ms │             527.85ms │ 1.11x slower │
│ QQuery 19    │  64.40ms │              69.18ms │ 1.07x slower │
│ QQuery 20    │ 122.47ms │             127.80ms │    no change │
│ QQuery 21    │ 346.16ms │             361.06ms │    no change │
│ QQuery 22    │  42.12ms │              43.32ms │    no change │
└──────────────┴──────────┴──────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (master)                 │ 2944.46ms │
│ Total Time (hash_join_batch_size)   │ 3073.18ms │
│ Average Time (master)               │  133.84ms │
│ Average Time (hash_join_batch_size) │  139.69ms │
│ Queries Faster                      │         0 │
│ Queries Slower                      │         4 │
│ Queries with No Change              │        18 │
└─────────────────────────────────────┴───────────┘

@alamb
Copy link
Contributor

alamb commented Jan 14, 2024

Thanks @korowa - I will try to review this carefully tomorrow (though it is a holiday so I may not get to it until Tuesday)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa -- I went through this PR carefully and it looks really nice to me. I had some small style / comment suggestions, but my major concern is the reported slowdown.

I am runnning the tpch_mem benchmarks myself now to see if I can reproduce your results, and I left some thoughts about potential places where the regression may have come from.

@metesynnada can you please review this PR again to ensure it preserves the invariants needed from SymmetricHashJoin? All the tests are passing so it seems to me like we should be good, but a second pair of eyes would probably be good

Also cc @Dandandan for your thoughts

Here are my benchmark results. They are similar but less pronounced as yours

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ hash_join_batch_size ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  217.67ms │             211.80ms │    no change │
│ QQuery 2     │   48.00ms │              47.94ms │    no change │
│ QQuery 3     │   83.12ms │              80.45ms │    no change │
│ QQuery 4     │   76.42ms │              77.78ms │    no change │
│ QQuery 5     │  128.00ms │             128.13ms │    no change │
│ QQuery 6     │   17.59ms │              18.28ms │    no change │
│ QQuery 7     │  313.54ms │             336.00ms │ 1.07x slower │
│ QQuery 8     │   85.88ms │              86.11ms │    no change │
│ QQuery 9     │  129.79ms │             131.18ms │    no change │
│ QQuery 10    │  160.24ms │             162.07ms │    no change │
│ QQuery 11    │   36.02ms │              35.37ms │    no change │
│ QQuery 12    │   71.54ms │              73.88ms │    no change │
│ QQuery 13    │   82.46ms │              87.87ms │ 1.07x slower │
│ QQuery 14    │   27.55ms │              27.73ms │    no change │
│ QQuery 15    │   65.98ms │              66.24ms │    no change │
│ QQuery 16    │   47.58ms │              47.02ms │    no change │
│ QQuery 17    │  176.15ms │             175.04ms │    no change │
│ QQuery 18    │  446.95ms │             487.56ms │ 1.09x slower │
│ QQuery 19    │   64.93ms │              63.96ms │    no change │
│ QQuery 20    │  119.30ms │             116.42ms │    no change │
│ QQuery 21    │  367.59ms │             369.21ms │    no change │
│ QQuery 22    │   31.19ms │              30.48ms │    no change │
└──────────────┴───────────┴──────────────────────┴──────────────┘

datafusion/physical-plan/src/joins/hash_join.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/joins/symmetric_hash_join.rs Outdated Show resolved Hide resolved
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
/// Matching offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this match? The build side offset that is matched?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated comment -- now it indicates that this attribute is used as a starting offset for lookups against join hashmap


matched_probe.as_slice_mut().reverse();
matched_build.as_slice_mut().reverse();
let mut hashes_buffer = vec![0; probe_batch.num_rows()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is one source of potential slowdown -- the hashes buffer is reallocated each time? Maybe we could change it to be reused as it was previously?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a proposal of how to do so: korowa#169

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable -- let me merge it, and check if we can go even further and create hashes after fetching probe batch (once per batch -- not per lookup iteration) -- if hashing is the major source of perf degradation -- it should help significantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash values precalculation still doesn't recover performance, guess I need a bit more time to investigate it. 😞

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will plan to do some profiling today and report my findings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time profiling and it looks like the additional time is spent in get_matched_indices_with_limit_offset (likely unsurprising). I wonder if there are some obvious wins there (maybe use / reuse Vecs rather than buffer builder to avoid so many reallocations ?)

Methodology

Comparing be361fd (which is the merge-base) to b631c1e

Methodology -- build like this

git checkout `git merge-base HEAD apache/main` # be361fdd8079a2f44da70f6af6e9d8eb3f7d0020
cargo build --profile release-nonlto --bin dfbench
cp target/release-nonlto/dfbench ~/Downloads/dfbench-be361fd
gh pr checkout https://github.com/apache/arrow-datafusion/pull/8020
cargo build --profile release-nonlto --bin dfbench
cp target/release-nonlto/dfbench ~/Downloads/dfbench-hash_join_batch_size

link the data

~/Downloads$ ln -s /Users/andrewlamb/Software/arrow-datafusion/benchmarks

Run query 18:

./dfbench-hash_join_batch_size tpch --iterations 5 --path /Users/andrewlamb/Software/arrow-datafusion/benchmarks/data/tpch_sf1 -m --format parquet --query 18

This PR

Screenshot 2024-01-16 at 7 21 58 AM

main

Screenshot 2024-01-16 at 7 25 16 AM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After precalculating hashes (it was the minor cause of degradation ~10ms) and removing all iterators-related constructions from lookup function (this function indeed was the major cause of execution slowdown ~30-40ms), I was able to obtain following results for q18

Comparing master and hash_join_batch_size
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃   master ┃ hash_join_batch_size ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 18    │ 508.30ms │             510.62ms │ no change │
└──────────────┴──────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                   ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (master)                 │ 508.30ms │
│ Total Time (hash_join_batch_size)   │ 510.62ms │
│ Average Time (master)               │ 508.30ms │
│ Average Time (hash_join_batch_size) │ 510.62ms │
│ Queries Faster                      │        0 │
│ Queries Slower                      │        0 │
│ Queries with No Change              │        1 │
└─────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃   master ┃ hash_join_batch_size ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 18    │ 455.00ms │             476.05ms │ no change │
└──────────────┴──────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                   ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (master)                 │ 455.00ms │
│ Total Time (hash_join_batch_size)   │ 476.05ms │
│ Average Time (master)               │ 455.00ms │
│ Average Time (hash_join_batch_size) │ 476.05ms │
│ Queries Faster                      │        0 │
│ Queries Slower                      │        0 │
│ Queries with No Change              │        1 │
└─────────────────────────────────────┴──────────┘

(with --iterations 50 --partitions 4 --query 18) which shows ~20-30ms speedup of what I've seen before for tpch-mem, but I don't have exact explanation of it yet. If you have (or will have) any additional comments / ideas / thoughts, it would be great.

});
}
// apply join filter if exists
let (left_indices, right_indices) = if let Some(filter) = &self.filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your changes to this code have made it easier to read. Thank you

)?;

self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(state.batch.num_rows());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the output rows be the number of rows in result rather than the entire batch? If it is the entire batch the output rows will be counted multiple times I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely should, thank you.

korowa and others added 3 commits January 15, 2024 21:11
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Reuse hashes buffer when emitting partial join results
let (initial_idx, initial_next_idx) = offset;
'probe: for (row_idx, hash_value) in iter.skip(initial_idx) {
let index = if initial_next_idx.is_some() && row_idx == initial_idx {
// If `initial_next_idx` is zero, then input index has been processed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might combine this logic with next==0 in the loop below?

@metesynnada
Copy link
Contributor

I forgot to review this PR even though it was on my shortlist, will review it tomorrow.

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the filter equality generates too much data (close to cross-join), you stop pulling data from the probe until calculating the probe results in a couple of steps, which is nice. I think this is well-thought PR, congrats, I bought the idea 😀.

  • You should put more effort into readability, particularly on process_probe_batch. You can divide the logic into some methods, there are too many things happening under a single method.

  • Maybe also you can think about reducing the offset handling out of the state methods, it becomes hard to incrementally add features into join operators.

Other than that, I think the structural integrity of the algorithm is quite fascinating.

Not exactly about this PR, but In the collect_build_side method, the build_timer will restart if the left_data feature returns PENDING. I missed it in the previous PR. Perhaps we can open an issue.

@@ -665,6 +668,8 @@ impl ExecutionPlan for HashJoinExec {
reservation,
state: HashJoinStreamState::WaitBuildSide,
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not comfortable selecting the limit as batch size since it does not consider memory. However, I am unsure of how to pick a heuristic value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose batch_size is the best fit out of all settings DF has at this moment -- there is no such setting (AFAIK) as batch_size_bytes / anything like adaptive batching based on memory size, so I suppose this logic to be more or less aligned with other operators.

timer.done();

self.state = HashJoinStreamState::FetchProbeBatch;
if probe_batch_scanned {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For state machine handling, please take a look at EagerJoinStream trait. It encapsulates the whole build and probe data pulling and state management, which means state handling and join calculations (inside the implementors of the trait) are visibly separated. It makes the reading quite easy. You can follow a layered approach like EagerJoinStream and SHJ. This means inside the method you are calculating joins, do not alter the state.

@alamb
Copy link
Contributor

alamb commented Jan 21, 2024

My plan for this PR is to stare at it a bit more under the profiler and try and get back some of the lost performance so we can feel better about merging it

@alamb
Copy link
Contributor

alamb commented Jan 21, 2024

I reran the benchmarks any my conclusion is that now this branch shows no change in performance 🚀

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ hash_join_batch_size ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  207.99ms │             211.40ms │     no change │
│ QQuery 2     │   45.81ms │              45.95ms │     no change │
│ QQuery 3     │   76.38ms │              76.74ms │     no change │
│ QQuery 4     │   72.99ms │              75.40ms │     no change │
│ QQuery 5     │  120.53ms │             121.80ms │     no change │
│ QQuery 6     │   16.81ms │              16.51ms │     no change │
│ QQuery 7     │  308.28ms │             317.97ms │     no change │
│ QQuery 8     │   82.58ms │              81.75ms │     no change │
│ QQuery 9     │  124.32ms │             125.46ms │     no change │
│ QQuery 10    │  152.78ms │             154.17ms │     no change │
│ QQuery 11    │   35.34ms │              34.39ms │     no change │
│ QQuery 12    │   71.18ms │              68.53ms │     no change │
│ QQuery 13    │   88.17ms │              82.83ms │ +1.06x faster │
│ QQuery 14    │   26.07ms │              26.46ms │     no change │
│ QQuery 15    │   62.80ms │              62.76ms │     no change │
│ QQuery 16    │   46.19ms │              45.84ms │     no change │
│ QQuery 17    │  148.89ms │             152.50ms │     no change │
│ QQuery 18    │  416.13ms │             435.87ms │     no change │
│ QQuery 19    │   62.67ms │              61.54ms │     no change │
│ QQuery 20    │  120.45ms │             113.52ms │ +1.06x faster │
│ QQuery 21    │  335.83ms │             366.62ms │  1.09x slower │
│ QQuery 22    │   31.35ms │              29.72ms │ +1.05x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main_base)              │ 2653.54ms │
│ Total Time (hash_join_batch_size)   │ 2707.72ms │
│ Average Time (main_base)            │  120.62ms │
│ Average Time (hash_join_batch_size) │  123.08ms │
│ Queries Faster                      │         3 │
│ Queries Slower                      │         1 │
│ Queries with No Change              │        18 │
└─────────────────────────────────────┴───────────┘

Nice work @korowa

@alamb
Copy link
Contributor

alamb commented Jan 21, 2024

Actually, I should say "really nice work"

@metesynnada
Copy link
Contributor

Hi @korowa, I think there is a good improvement here.

Do you think that we can easily adapt the same mechanism into nested loop join? This mechanism should be there as well, do you think we can make it without code duplication? If so, we can merge this PR and open an issue for nested loop join.

@alamb
Copy link
Contributor

alamb commented Jan 22, 2024

Hi @korowa, I think there is a good improvement here.

Do you think that we can easily adapt the same mechanism into nested loop join? This mechanism should be there as well, do you think we can make it without code duplication? If so, we can merge this PR and open an issue for nested loop join.

Good call @metesynnada -- I have filed #8952 to track this issue

@alamb alamb merged commit f2e6701 into apache:main Jan 22, 2024
22 checks passed
@alamb
Copy link
Contributor

alamb commented Jan 22, 2024

Thanks again everyone for their reviews and @korowa for sticking with this. It is both really appreciated and a really nice PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

HashJoinStream memory tracking insufficient
4 participants