Skip to content

Commit

Permalink
executor: track the memory usage for building range in IndexLookUpExe…
Browse files Browse the repository at this point in the history
…cutor (#56497) (#56877)

close #56440
  • Loading branch information
ti-chi-bot authored Nov 5, 2024
1 parent 3508c6b commit dc5f812
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
12 changes: 10 additions & 2 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"
"sync/atomic"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -765,8 +766,10 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx
for i := range krs {
krs[i] = make([]kv.KeyRange, 0, len(ranges))
}

const checkSignalStep = 8
if memTracker != nil {
memTracker.Consume(int64(unsafe.Sizeof(kv.KeyRange{})) * int64(len(ranges)))
}
const checkSignalStep = 64
var estimatedMemUsage int64
// encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to
// check the interrupt signal periodically.
Expand Down Expand Up @@ -794,6 +797,11 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx
if interruptSignal != nil && interruptSignal.Load().(bool) {
return kv.NewPartitionedKeyRanges(nil), nil
}
if memTracker != nil {
// We use the Tracker.Consume function to check the memory usage of the current SQL.
// If the memory exceeds the quota, kill the SQL.
memTracker.Consume(1)
}
}
}
return kv.NewPartitionedKeyRanges(krs), nil
Expand Down
12 changes: 10 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
return err
}
}

if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.id, -1)
}
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

err = e.buildTableKeyRanges()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -521,7 +529,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) {
if e.index.ID == -1 {
kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges)
} else {
kvRange, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback)
kvRange, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, ranges, e.feedback, e.memTracker, nil)
}
if err != nil {
return err
Expand All @@ -534,7 +542,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) {
if e.index.ID == -1 {
kvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges)
} else {
kvRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback)
kvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, e.ranges, e.feedback, e.memTracker, nil)
}
e.kvRanges = kvRanges.FirstPartitionRange()
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) {
}

require.Equal(t, 2*bytesConsumed1, bytesConsumed2)
require.Equal(t, int64(20760), bytesConsumed1)
require.Equal(t, int64(25570), bytesConsumed1)
}

func generateIndexRange(vals ...int64) *ranger.Range {
Expand Down

0 comments on commit dc5f812

Please sign in to comment.