-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: refine hash join v2 for spill #55790
Conversation
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufei@pingcap.com>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufei@pingcap.com>
Signed-off-by: xufei <xufei@pingcap.com>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Hi @xzhangxian1008. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #55790 +/- ##
================================================
+ Coverage 73.0085% 73.4534% +0.4449%
================================================
Files 1584 1600 +16
Lines 443036 459190 +16154
================================================
+ Hits 323454 337291 +13837
- Misses 99870 101902 +2032
- Partials 19712 19997 +285
Flags with carried forward coverage won't be shown. Click here to find out more.
|
/cc @windtalker @XuHuaiyu |
select { | ||
case <-doneCh: | ||
syncerDone(fetcherAndWorkerSyncer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put syncerDone
as defer function?
return nil | ||
} else if joinResult.err != nil || (joinResult.chk != nil && joinResult.chk.NumRows() > 0) { | ||
w.HashJoinCtx.joinResultCh <- joinResult | ||
} else if joinResult.chk != nil && joinResult.chk.NumRows() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will this happen?
This logic is from the origin hash join. Maybe this happens when there are no rows be successfully joined.
builder.rowNumberInCurrentRowTableSeg[partitionID] = 0 | ||
failpoint.Inject("finalizeCurrentSegPanic", nil) | ||
seg.finalized = true | ||
htc.memoryTracker.Consume(seg.totalUsedBytes()) | ||
if needConsume { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will needConsume
be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will
needConsume
be false?
When spill is triggered, needConsume
will be false.
htc.rowTables = nil | ||
return totalSegmentCnt | ||
|
||
htc.clearAllSegmentsInRowTable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why keep htc.rowTables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why keep
htc.rowTables
Because htc.rowTables
may be used in next restore round.
pkg/executor/join/hash_join_v2.go
Outdated
@@ -301,6 +299,7 @@ func (e *HashJoinV2Exec) Open(ctx context.Context) error { | |||
e.prepared = false | |||
return err | |||
} | |||
e.isMemoryClearedForTest = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set a test variable to true by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set a test variable to true by default?
Each time finishing a restore round, we will check if all memory has been cleared. Only when there is memory not clear then this variable will be set to false. So we need to set it to true at the beginning.
func (e *HashJoinV2Exec) waitJoinWorkersAndCloseResultChan() { | ||
e.workerWg.Wait() | ||
// finaWorker is responsible for scanning the row table after probe done and wake up the build fetcher | ||
func (e *HashJoinV2Exec) finalWorker(syncer chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the join don't need to scan hash table after probe, there is no need to start final worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the join don't need to scan hash table after probe, there is no need to start final worker?
Yes, but final worker will not create more go routines when there is no need to scan hash table.
@@ -588,40 +524,31 @@ func (w *ProbeWorkerV2) getNewJoinResult() (bool, *hashjoinWorkerResult) { | |||
func (e *HashJoinV2Exec) Next(ctx context.Context, req *chunk.Chunk) (err error) { | |||
if !e.prepared { | |||
e.initHashTableContext() | |||
e.initializeForProbe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move e.initializeForProbe()
out from e.fetchAndProbeHashTable()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move
e.initializeForProbe()
out frome.fetchAndProbeHashTable()
?
Because joinResultCh
in e.initializeForProbe()
function needs to be initialized before starting build fetcher, as joinResultCh
is also used for transmitting error.
req.Reset() | ||
|
||
result, ok := <-e.joinResultCh | ||
e.recycleChunk(req, result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move this recycle code before error check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move this recycle code before error check?
If error happens, we will directly return and fail to recycle chunk, this will let workers hang.
e.buildFinished <- err | ||
return false | ||
for ifContinue { | ||
ifContinue, err = e.dispatchBuildTasksImpl(syncer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatchBuildTasksImpl can be called multiple times here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatchBuildTasksImpl can be called multiple times here?
Yes, after spill is added, this function will be called multiple times/
@@ -114,12 +114,13 @@ func (s *parallelSortSpillAction) actionImpl(t *memory.Tracker) bool { | |||
s.spillHelper.cond.Wait() | |||
} | |||
|
|||
// we may be in `needSpill` status, so it's necessary check if we are in `notSpilled` status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated changes?
Yes, it's unrelated with hash join refine. But I find that we can refine the comment in sort spill, so I put it in this pr.
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, currentRow), joinResult.chk, 0, false) | ||
insertedRows++ | ||
} | ||
j.appendBuildRow(meta, joinResult.chk, j.rowIter.getValue(), &insertedRows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this
I think it's more readable to put these codes into a function and reader could know the purpose of these codes from function name.
// We must ensure that all prebuild workers have exited before | ||
// set `finished` flag and close buildFetcherAndDispatcherSyncChan | ||
fetcherAndWorkerSyncer.Wait() | ||
e.finished.Store(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means this function need to wait all the probe finishes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means this function need to wait all the probe finishes?
No, build fetcher could directly exit without waiting all probe finished.
@xzhangxian1008: The following tests failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
What problem does this PR solve?
Issue Number: ref #55153
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.