-
Notifications
You must be signed in to change notification settings - Fork 6.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve left/inner join performance by rerange right table by keys #60341
Improve left/inner join performance by rerange right table by keys #60341
Conversation
Note about that we may have sparse columns and working with them through IColumn interface is critical. |
This is an automated comment for commit 597181c with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page
Successful checks
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments.
src/Interpreters/RowRefs.h
Outdated
{ | ||
auto * batch = pool.alloc<Batch>(); | ||
*batch = Batch(this); | ||
batch->insert(std::move(row_ref), pool); | ||
return batch; | ||
} | ||
|
||
row_nums[size] = row_ref.row_num; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why row_nums
is required? Isn't row_nums[i]
essentially the same as row_refs[i].row_num
?
src/Interpreters/RowRefs.h
Outdated
@@ -46,6 +46,7 @@ struct RowRefList : RowRef | |||
SizeT size = 0; /// It's smaller than size_t but keeps align in Arena. | |||
Batch * next; | |||
RowRef row_refs[MAX_SIZE]; | |||
UInt64 row_nums[MAX_SIZE]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type should be ColumnIndex
src/Columns/ColumnString.cpp
Outdated
void ColumnString::insertIndicesFrom(const IColumn & src, const IColumn::ColumnIndex * selector, const size_t & size) | ||
{ | ||
for (size_t i = 0; i < size; ++i) | ||
insertFrom(src, *(selector + i)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overload is not different from the base class method.
src/Interpreters/RowRefs.h
Outdated
|
||
void nextBatch() | ||
{ | ||
batch = batch->next; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This invalidates the existing position
, using operator ++
and nextBatch()
together is not viable.
src/Interpreters/RowRefs.h
Outdated
@@ -55,14 +56,14 @@ struct RowRefList : RowRef | |||
|
|||
Batch * insert(RowRef && row_ref, Arena & pool) | |||
{ | |||
if (full()) | |||
if (full() || (size > 0 && row_ref.block != row_refs[0].block)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new condition should be in the form a function with a human-readable name.
2e2e1d1
to
2cf4dbf
Compare
we have optimize inner join use batch insert base on the pr #58278, and on our gluten case , it can performance better. please review it , thanks . @jkartseva |
b16f0da
to
4a40b4b
Compare
4a40b4b
to
202b571
Compare
cc @jkartseva |
8b38c9c
to
f266807
Compare
Is the most recent iteration ready for review @KevinyhZou ? |
yes @jkartseva |
I assume some CI failures can be related, for example
Contains thread sanitizer report around Cloud you please check it? |
OK |
a85bf39
to
9976aeb
Compare
cc @vdimir |
insertFrom
callinsertFrom
call
e475372
to
4200820
Compare
Dear @jkartseva, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |
c483bb0
to
add486b
Compare
any comments about this pr ? @jkartseva |
I'll take a look by the end of the week @KevinyhZou |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make this feature experimental. I can help with rolling it out to the staging tier in our cloud. If there are no regressions, we may deprecate the experimental flag.
{ | ||
for (size_t i = 0; i < block.columns(); ++i) | ||
{ | ||
auto & col = *(block.getByPosition(i).column->assumeMutable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assumeMutableRef()
src/Core/Settings.h
Outdated
M(Int32, join_to_sort_perkey_rows_threshold, 40, "The lower limit of per-key average rows in the right table to determine whether to sort it in hash join.", 0) \ | ||
M(Int32, join_to_sort_table_rows_threshold, 10000, "The upper limit of rows in the right table to determine whether to sort it in hash join.", 0) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How were the 40
and 10000
thresholds selected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this feature experimental (e.g., allow_experimental_inner_join_right_table_sorting
) and provide a functional test with this setting SET
to 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The meaning of the thresholds is unclear without reading the code. We could consider one of the following options: updating the description to clarify how changing the setting affects user-experience (for example, using a special join method that improves performance for wide tables but increases memory consumption) or, even better, removing the thresholds and choosing the best value automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested locally and found that if there are many rows on the right but very few matching rows, sorting will lead to performance degradation. In this scenario, I found through testing that 10000 of join_to_sort_table_rows_threshold
is a reasonable value, which means the right table is not very big and will not cause significant performance degradation due to sorting. On the contrary, if there are lots of matching rows to output, the threshold can be increased to allow a larger right table to be sorted, which can still achieve significant performance improvement.
And another threshold was set default value 40, as I test on the table when the table is not dense enough, and the sorting may also cause the performance degradation, when the threshold set up to 40, then no performance slow down. @jkartseva
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have update the description of the threshold
settings, take a look at it whether would be ok? @vdimir
@@ -115,6 +115,7 @@ class AddedColumns | |||
} | |||
join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows(); | |||
output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold(); | |||
join_data_sorted = join.getJoinedData()->sorted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to the initialization list.
|
||
void HashJoin::tryRerangeRightTableData() | ||
{ | ||
if ((kind != JoinKind::Inner && kind != JoinKind::Left) || strictness != JoinStrictness::All || table_join->getMixedJoinExpression()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!isInnerOrLeft(kind)
void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]]) | ||
{ | ||
constexpr JoinFeatures<KIND, STRICTNESS, Map> join_features; | ||
if constexpr (join_features.is_all_join && (join_features.left || join_features.inner)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The external function tryRerangeRightTableData
already checks these conditions.
Let's throw a LOGICAL_ERROR
if they are not satisfied here.
auto it = rows_ref.begin(); | ||
if (it.ok()) | ||
{ | ||
if (blocks.empty() || blocks.back().rows() > DEFAULT_BLOCK_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the condition be blocks.back().rows() >= DEFAULT_BLOCK_SIZE
?
kind, | ||
strictness, | ||
data->maps.front(), | ||
false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: /*prefer_use_maps_all*/ false
if (sample_block_with_columns_to_add.columns() == 0) | ||
{ | ||
LOG_DEBUG(log, "The joined right table total rows :{}, total keys :{}, columns added:{}", | ||
data->rows_to_join, data->keys_to_join, sample_block_with_columns_to_add.columns()); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate on this condition.
Also, why log sample_block_with_columns_to_add.columns()
?
data->rows_to_join, data->keys_to_join, sample_block_with_columns_to_add.columns()); | ||
return; | ||
} | ||
joinDispatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
[[maybe_unused]] bool result = joinDispatch(...);
chassert(result);
cc @jkartseva |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good, let's adjust the setting names (see comment) and provide cleaner descriptions, and I'll approve & merge.
src/Core/Settings.h
Outdated
@@ -922,6 +922,9 @@ class IColumn; | |||
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ | |||
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ | |||
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ | |||
M(Int32, join_to_sort_perkey_rows_threshold, 40, "Rerange the right table by key in left or inner hash join when the per-key average rows of it exceed this value (means the table keys is dense) and its number of rows is not too many(controlled by `join_to_sort_table_rows_threshold`), to make the join output by the data batch of key, which would improve performance.", 0) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, a description should be more focused on the particular setting it's describing.
I think it should be reworded, e.g.:
The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys...
Also, the setting name should contain "lower" or "min".
src/Core/Settings.h
Outdated
@@ -922,6 +922,9 @@ class IColumn; | |||
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ | |||
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ | |||
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ | |||
M(Int32, join_to_sort_perkey_rows_threshold, 40, "Rerange the right table by key in left or inner hash join when the per-key average rows of it exceed this value (means the table keys is dense) and its number of rows is not too many(controlled by `join_to_sort_table_rows_threshold`), to make the join output by the data batch of key, which would improve performance.", 0) \ | |||
M(Int32, join_to_sort_table_rows_threshold, 10000, "Rerange the right table by key in left or inner hash join when its number of rows not exceed this value and the table keys is dense (controlled by `join_to_sort_perkey_rows_threshold`), to make the join performance improve as output by the data batch of key, but not cost too much on the table reranging.", 0) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly:
The upper threshold of the number of rows in the right table to determine whether to rerange the right table by key in left or inner join.
Or:
The maximum number of rows in the right table...
"upper" or "max" should be in the setting name.
src/Core/SettingsChangesHistory.cpp
Outdated
{"input_format_try_infer_datetimes_only_datetime64", true, false, "Allow to infer DateTime instead of DateTime64 in data formats"}, | ||
{"join_to_sort_perkey_rows_threshold", 0, 40, "Rerange the right table by key in left or inner hash join when the per-key average rows of it exceed this value (means the table keys is dense) and its number of rows is not too many(controlled by `join_to_sort_table_rows_threshold`), to make the join output by the data batch of key, which would improve performance."}, | ||
{"join_to_sort_table_rows_threshold", 0, 10000, "Rerange the right table by key in left or inner hash join when its number of rows not exceed this value and the table keys is dense (controlled by `join_to_sort_perkey_rows_threshold`), to make the join performance improve as output by the data batch of key, but not cost too much on the table reranging."}, | ||
{"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_perkey_rows_threshold` and `join_to_sort_perkey_rows_threshold` are met, then we will try to rerange the right table by key to improve the performance in left or inner hash join."}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove "try" from the description:
"...are met, rerange the right table by key..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thank you for working on this.
Please update the Changelog entry with a more generalized summary. The present description is too verbose and focused on the specific case. |
done |
insertFrom
callb5289c1
Upgrade check is failing after this one in other PRs:
|
It seems this settings should be added to 24.9, and I will try to fix this. @zvonand |
@@ -922,6 +922,9 @@ class IColumn; | |||
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ | |||
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ | |||
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ | |||
M(Int32, join_to_sort_minimum_perkey_rows, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys", 0) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as https://github.com/ClickHouse/ClickHouse/pull/63677/files#r1771844385. Why is this a Int32 it it's treated as unsigned and it shouldn't be negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the type is wrong. And I have made a pr to change the type to UInt64, #69886
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Improve the join performance by rerange the right table by keys while the table keys are dense in left or inner hash join.
Documentation entry for user-facing changes