diff --git a/executor/executor.go b/executor/executor.go index dd7a82dc1fadb..a9a87072ecab6 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -115,7 +115,7 @@ type baseExecutor struct { const ( // globalPanicStorageExceed represents the panic message when out of storage quota. - globalPanicStorageExceed string = "Out Of Global Storage Quota!" + globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!" // globalPanicMemoryExceed represents the panic message when out of memory limit. globalPanicMemoryExceed string = "Out Of Global Memory Limit!" ) @@ -1705,6 +1705,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil { sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker) } +<<<<<<< HEAD switch globalConfig.OOMAction { case config.OOMActionCancel: action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} @@ -1716,6 +1717,63 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) sc.MemTracker.SetActionOnExceed(action) +======= + + sc.StatsLoad.Timeout = 0 + sc.StatsLoad.NeededItems = nil + sc.StatsLoad.ResultCh = nil + + sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow + + vars.MemTracker.Detach() + vars.MemTracker.UnbindActions() + vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) + vars.MemTracker.ResetMaxConsumed() + vars.DiskTracker.Detach() + vars.DiskTracker.ResetMaxConsumed() + vars.MemTracker.SessionID.Store(vars.ConnectionID) + vars.StmtCtx.TableStats = make(map[int64]interface{}) + + isAnalyze := false + if execStmt, ok := s.(*ast.ExecuteStmt); ok { + prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) + if err != nil { + return err + } + _, isAnalyze = prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt) + } else if _, ok := s.(*ast.AnalyzeTableStmt); ok { + isAnalyze = true + } + if isAnalyze { + sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) + vars.MemTracker.SetBytesLimit(-1) + vars.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker) + } else { + sc.InitMemTracker(memory.LabelForSQLText, -1) + } + logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota + switch variable.OOMAction.Load() { + case variable.OOMActionCancel: + action := &memory.PanicOnExceed{ConnID: vars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + vars.MemTracker.SetActionOnExceed(action) + case variable.OOMActionLog: + fallthrough + default: + action := &memory.LogOnExceed{ConnID: vars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + vars.MemTracker.SetActionOnExceed(action) + } + sc.MemTracker.SessionID.Store(vars.ConnectionID) + sc.MemTracker.AttachTo(vars.MemTracker) + sc.InitDiskTracker(memory.LabelForSQLText, -1) + globalConfig := config.GetGlobalConfig() + if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil { + sc.DiskTracker.AttachTo(vars.DiskTracker) + if GlobalDiskUsageTracker != nil { + vars.DiskTracker.AttachTo(GlobalDiskUsageTracker) + } +>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549)) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 3407022f4718f..00edbf26f1b02 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -16,6 +16,7 @@ package chunk import ( "errors" + "fmt" "sort" "sync" "time" @@ -137,7 +138,23 @@ func (c *RowContainer) SpillToDisk() { N := c.m.records.inMemory.NumChunks() c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes()) c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker) +<<<<<<< HEAD for i := 0; i < N; i++ { +======= + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + c.m.records.spillError = err + logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err)) + } + }() + failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) { + if val.(bool) { + panic("out of disk quota when spilling") + } + }) + for i := 0; i < n; i++ { +>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549)) chk := c.m.records.inMemory.GetChunk(i) err = c.m.records.inDisk.Add(chk) if err != nil { diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 2bf8ef26644bc..c7267c5fee99a 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -303,3 +303,221 @@ func TestActionBlocked(t *testing.T) { ac.Action(tracker) require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond) } +<<<<<<< HEAD +======= + +func insertBytesRowsIntoRowContainer(t *testing.T, chkCount int, rowPerChk int) (*RowContainer, [][]byte) { + longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(4096).Build() + fields := []*types.FieldType{&longVarCharTyp} + + rc := NewRowContainer(fields, chkCount) + + allRows := [][]byte{} + // insert chunks + for i := 0; i < chkCount; i++ { + chk := NewChunkWithCapacity(fields, rowPerChk) + // insert rows for each chunk + for j := 0; j < rowPerChk; j++ { + length := rand2.Uint32() + randomBytes := make([]byte, length%4096) + _, err := rand.Read(randomBytes) + require.NoError(t, err) + + chk.AppendBytes(0, randomBytes) + allRows = append(allRows, randomBytes) + } + require.NoError(t, rc.Add(chk)) + } + + return rc, allRows +} + +func TestRowContainerReaderInDisk(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16) + rc.SpillToDisk() + + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 16; i++ { + for j := 0; j < 16; j++ { + row := reader.Current() + require.Equal(t, allRows[i*16+j], row.GetBytes(0)) + reader.Next() + } + } +} + +func TestCloseRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16) + rc.SpillToDisk() + + // read 8.5 of these chunks + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 8; i++ { + for j := 0; j < 16; j++ { + row := reader.Current() + require.Equal(t, allRows[i*16+j], row.GetBytes(0)) + reader.Next() + } + } + for j := 0; j < 8; j++ { + row := reader.Current() + require.Equal(t, allRows[8*16+j], row.GetBytes(0)) + reader.Next() + } +} + +func TestConcurrentSpillWithRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024) + + var wg sync.WaitGroup + // concurrently read and spill to disk + wg.Add(1) + go func() { + defer wg.Done() + reader := NewRowContainerReader(rc) + defer reader.Close() + + for i := 0; i < 16; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } + }() + rc.SpillToDisk() + wg.Wait() +} + +func TestReadAfterSpillWithRowContainerReader(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = t.TempDir() + }) + + rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024) + + reader := NewRowContainerReader(rc) + defer reader.Close() + for i := 0; i < 8; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } + rc.SpillToDisk() + for i := 8; i < 16; i++ { + for j := 0; j < 1024; j++ { + row := reader.Current() + require.Equal(t, allRows[i*1024+j], row.GetBytes(0)) + reader.Next() + } + } +} + +func TestPanicWhenSpillToDisk(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + rc := NewRowContainer(fields, sz) + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) + }() + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err := rc.GetRow(RowPtr{}) + require.EqualError(t, err, "out of disk quota when spilling") + require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") +} + +func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 512) +} + +func BenchmarkRowContainerReaderInDiskWithRowSize1024(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 1024) +} + +func BenchmarkRowContainerReaderInDiskWithRowSize4096(b *testing.B) { + benchmarkRowContainerReaderInDiskWithRowLength(b, 4096) +} + +func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) { + b.StopTimer() + + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.TempStoragePath = b.TempDir() + }) + + longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(rowLength).Build() + fields := []*types.FieldType{&longVarCharTyp} + + randomBytes := make([]byte, rowLength) + _, err := rand.Read(randomBytes) + require.NoError(b, err) + + // create a row container which stores the data in disk + rc := NewRowContainer(fields, 1<<10) + rc.SpillToDisk() + + // insert `b.N * 1<<10` rows (`b.N` chunks) into the rc + for i := 0; i < b.N; i++ { + chk := NewChunkWithCapacity(fields, 1<<10) + for j := 0; j < 1<<10; j++ { + chk.AppendBytes(0, randomBytes) + } + + rc.Add(chk) + } + + reader := NewRowContainerReader(rc) + defer reader.Close() + b.StartTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1<<10; i++ { + reader.Next() + } + } + require.NoError(b, reader.Error()) +} +>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))