Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: track the memroy usage in HashJoin probe phase (#41081) #41224

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ type hashRowContainer struct {
hashTable baseHashTable

rowContainer *chunk.RowContainer
<<<<<<< HEAD
=======
memTracker *memory.Tracker

// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
chkBufSizeForOneProbe int64
>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
Expand All @@ -104,6 +112,88 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
return &newHRC
}

<<<<<<< HEAD
=======
// GetMatchedRows get matched rows from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row) ([]chunk.Row, error) {
matchedRows, _, err := c.GetMatchedRowsAndPtrs(probeKey, probeRow, hCtx, matched, nil, false)
return matchedRows, err
}

func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildRowPos, needCheckProbeRowPos []int) ([]chunk.Row, error) {
// for NAAJ probe row with null, we should match them with all build rows.
var (
ok bool
err error
innerPtrs []chunk.RowPtr
)
c.hashTable.Iter(
func(_ uint64, e *entry) {
entryAddr := e
for entryAddr != nil {
innerPtrs = append(innerPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
})
matched = matched[:0]
if len(innerPtrs) == 0 {
return matched, nil
}
// all built bucket rows come from hash table, their bitmap are all nil (doesn't contain any null). so
// we could only use the probe null bits to filter valid rows.
if probeKeyNullBits != nil && len(probeHCtx.naKeyColIdx) > 1 {
// if len(probeHCtx.naKeyColIdx)=1
// that means the NA-Join probe key is directly a (null) <-> (fetch all buckets), nothing to do.
// else like
// (null, 1, 2), we should use the not-null probe bit to filter rows. Only fetch rows like
// ( ? , 1, 2), that exactly with value as 1 and 2 in the second and third join key column.
needCheckProbeRowPos = needCheckProbeRowPos[:0]
needCheckBuildRowPos = needCheckBuildRowPos[:0]
keyColLen := len(c.hCtx.naKeyColIdx)
for i := 0; i < keyColLen; i++ {
// since all bucket is from hash table (Not Null), so the buildSideNullBits check is eliminated.
if probeKeyNullBits.UnsafeIsSet(i) {
continue
}
needCheckBuildRowPos = append(needCheckBuildRowPos, c.hCtx.naKeyColIdx[i])
needCheckProbeRowPos = append(needCheckProbeRowPos, probeHCtx.naKeyColIdx[i])
}
}
var mayMatchedRow chunk.Row
for _, ptr := range innerPtrs {
mayMatchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
if probeKeyNullBits != nil && len(probeHCtx.naKeyColIdx) > 1 {
// check the idxs-th value of the join columns.
ok, err = codec.EqualChunkRow(c.sc, mayMatchedRow, c.hCtx.allTypes, needCheckBuildRowPos, probeSideRow, probeHCtx.allTypes, needCheckProbeRowPos)
if err != nil {
return nil, err
}
if !ok {
continue
}
// once ok. just append the (maybe) valid build row for latter other conditions check if any.
}
matched = append(matched, mayMatchedRow)
}
return matched, nil
}

// signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered.
const signalCheckpointForJoin int = 1 << 14

// rowSize is the size of Row.
const rowSize = int64(unsafe.Sizeof(chunk.Row{}))

// rowPtrSize is the size of RowPtr.
const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))

>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
Expand All @@ -114,9 +204,27 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
}
matched = make([]chunk.Row, 0, len(innerPtrs))
var matchedRow chunk.Row
<<<<<<< HEAD
matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs))
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
=======
matchedPtrs = matchedPtrs[:0]

// Some variables used for memTracker.
var (
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
lastChunkBufPointer *chunk.Chunk = nil
memDelta int64 = 0
)
c.chkBuf = nil
c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize)
defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta)
c.chkBufSizeForOneProbe = 0

for i, ptr := range innerPtrs {
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
>>>>>>> 5cb84186dc (executor: track the memroy usage in HashJoin probe phase (#41081))
if err != nil {
return
}
Expand All @@ -125,6 +233,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
if err != nil {
return
}
if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
lastChunkSize := lastChunkBufPointer.MemoryUsage()
c.chkBufSizeForOneProbe += lastChunkSize
memDelta += lastChunkSize
}
lastChunkBufPointer = c.chkBuf
if i&signalCheckpointForJoin == 0 {
// Trigger Consume for checking the OOM Action signal
memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
c.memTracker.Consume(memDelta + 1)
memDelta = 0
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
continue
Expand Down
17 changes: 17 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2728,3 +2728,20 @@ func (s *testSuiteJoinSerial) TestIssue31129(c *C) {
c.Assert(failpoint.Disable(fpName1), IsNil)
c.Assert(failpoint.Disable(fpName2), IsNil)
}

func TestCartesianJoinPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("set tidb_mem_quota_query = 1 << 30")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
tk.MustExec("set global tidb_enable_tmp_storage_on_oom = off;")
for i := 0; i < 14; i++ {
tk.MustExec("insert into t select * from t")
}
err := tk.QueryToErr("desc analyze select * from t t1, t t2, t t3, t t4, t t5, t t6;")
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}