Skip to content

Commit

Permalink
executor, util: make tmp-storage-quota take affect (#45549)
Browse files Browse the repository at this point in the history
close #45161
  • Loading branch information
wshwsh12 authored Aug 1, 2023
1 parent b42a81a commit 838b367
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
6 changes: 5 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type dataSourceExecutor interface {

const (
// globalPanicStorageExceed represents the panic message when out of storage quota.
globalPanicStorageExceed string = "Out Of Global Storage Quota!"
globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!"
// globalPanicMemoryExceed represents the panic message when out of memory limit.
globalPanicMemoryExceed string = "Out Of Global Memory Limit!"
// globalPanicAnalyzeMemoryExceed represents the panic message when out of analyze memory limit.
Expand Down Expand Up @@ -2014,6 +2014,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.MemTracker.UnbindActions()
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
vars.DiskTracker.Detach()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})
Expand Down Expand Up @@ -2054,6 +2055,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
globalConfig := config.GetGlobalConfig()
if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil {
sc.DiskTracker.AttachTo(vars.DiskTracker)
if GlobalDiskUsageTracker != nil {
vars.DiskTracker.AttachTo(GlobalDiskUsageTracker)
}
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
Expand Down
13 changes: 13 additions & 0 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package chunk

import (
"errors"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -140,6 +141,18 @@ func (c *RowContainer) SpillToDisk() {
n := c.m.records.inMemory.NumChunks()
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
c.m.records.spillError = err
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
}
}()
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
if val.(bool) {
panic("out of disk quota when spilling")
}
})
for i := 0; i < n; i++ {
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
Expand Down
31 changes: 31 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,37 @@ func TestReadAfterSpillWithRowContainerReader(t *testing.T) {
}
}

func TestPanicWhenSpillToDisk(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
sz := 20
chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}

rc := NewRowContainer(fields, sz)
tracker := rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
}()
require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err := rc.GetRow(RowPtr{})
require.EqualError(t, err, "out of disk quota when spilling")
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
}
Expand Down

0 comments on commit 838b367

Please sign in to comment.