From 838b3674752d9626f0dd3076798c543f69ef609e Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 1 Aug 2023 20:22:08 +0800 Subject: [PATCH] executor, util: make tmp-storage-quota take affect (#45549) close pingcap/tidb#45161 --- executor/executor.go | 6 +++++- util/chunk/row_container.go | 13 +++++++++++++ util/chunk/row_container_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/executor/executor.go b/executor/executor.go index c477f25c435d8..e6856a6703193 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -138,7 +138,7 @@ type dataSourceExecutor interface { const ( // globalPanicStorageExceed represents the panic message when out of storage quota. - globalPanicStorageExceed string = "Out Of Global Storage Quota!" + globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!" // globalPanicMemoryExceed represents the panic message when out of memory limit. globalPanicMemoryExceed string = "Out Of Global Memory Limit!" // globalPanicAnalyzeMemoryExceed represents the panic message when out of analyze memory limit. @@ -2014,6 +2014,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) vars.MemTracker.ResetMaxConsumed() + vars.DiskTracker.Detach() vars.DiskTracker.ResetMaxConsumed() vars.MemTracker.SessionID.Store(vars.ConnectionID) vars.StmtCtx.TableStats = make(map[int64]interface{}) @@ -2054,6 +2055,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { globalConfig := config.GetGlobalConfig() if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil { sc.DiskTracker.AttachTo(vars.DiskTracker) + if GlobalDiskUsageTracker != nil { + vars.DiskTracker.AttachTo(GlobalDiskUsageTracker) + } } if execStmt, ok := s.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 7ae1a67879b03..bf095dbed18db 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -16,6 +16,7 @@ package chunk import ( "errors" + "fmt" "sort" "sync" "time" @@ -140,6 +141,18 @@ func (c *RowContainer) SpillToDisk() { n := c.m.records.inMemory.NumChunks() c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes()) c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker) + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + c.m.records.spillError = err + logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err)) + } + }() + failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) { + if val.(bool) { + panic("out of disk quota when spilling") + } + }) for i := 0; i < n; i++ { chk := c.m.records.inMemory.GetChunk(i) err = c.m.records.inDisk.Add(chk) diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 2160496813cfd..638a42090bace 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -439,6 +439,37 @@ func TestReadAfterSpillWithRowContainerReader(t *testing.T) { } } +func TestPanicWhenSpillToDisk(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + rc := NewRowContainer(fields, sz) + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) + }() + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err := rc.GetRow(RowPtr{}) + require.EqualError(t, err, "out of disk quota when spilling") + require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") +} + func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) { benchmarkRowContainerReaderInDiskWithRowLength(b, 512) }