From f48aaf25c5a9334a50c9ff102788456a19b96b35 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 28 Aug 2024 18:04:47 +0800 Subject: [PATCH 1/2] statistics: add upper bound of retry for sync load Signed-off-by: Weizhen Wang --- statistics/handle/BUILD.bazel | 3 +- statistics/handle/handle_hist.go | 67 +++++++++++++--------- statistics/handle/handle_hist_test.go | 82 ++++++++++++++++++++++++++- 3 files changed, 120 insertions(+), 32 deletions(-) diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 43dc0d0514918..34b15ad7d9c41 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -74,13 +74,12 @@ go_test( embed = [":handle"], flaky = True, race = "on", - shard_count = 34, + shard_count = 35, deps = [ "//config", "//domain", "//parser/model", "//parser/mysql", - "//sessionctx", "//sessionctx/stmtctx", "//sessionctx/variable", "//statistics", diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index a29ea7067a612..231a748cead9c 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -37,6 +37,9 @@ import ( "golang.org/x/sync/singleflight" ) +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + var globalStatsSyncLoadSingleFlight singleflight.Group type statsWrapper struct { @@ -58,6 +61,7 @@ type NeededItemTask struct { TableItemID model.TableItemID ToTimeout time.Time ResultCh chan stmtctx.StatsLoadResult + Retry int } // SendLoadRequests send neededColumns requests @@ -210,7 +214,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry var lastTask *NeededItemTask for { - task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit) + task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit) lastTask = task if err != nil { switch err { @@ -228,7 +232,10 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. -func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) { +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. +func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -237,7 +244,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } }() if lastTask == nil { - task, err = h.drainColTask(sctx, exit) + task, err = h.drainColTask(exit) if err != nil { if err != errExit { logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) @@ -247,27 +254,36 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } else { task = lastTask } + result := stmtctx.StatsLoadResult{Item: task.TableItemID} resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) { - return h.handleOneItemTask(task, readerCtx, ctx) + err := h.handleOneItemTask(task, readerCtx, ctx) + return nil, err }) timeout := time.Until(task.ToTimeout) select { - case result := <-resultChan: - if result.Err == nil { - slr := result.Val.(*stmtctx.StatsLoadResult) - if slr.Error != nil { - return task, slr.Error - } - task.ResultCh <- *slr + case sr := <-resultChan: + // sr.Val is always nil. + if sr.Err == nil { + task.ResultCh <- result return nil, nil } - return task, result.Err + if !isVaildForRetry(task) { + result.Error = sr.Err + task.ResultCh <- result + return nil, nil + } + return task, sr.Err case <-time.After(timeout): return task, nil } } -func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) { +func isVaildForRetry(task *NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -275,24 +291,23 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC err = errors.Errorf("stats loading panicked: %v", r) } }() - result = &stmtctx.StatsLoadResult{Item: task.TableItemID} - item := result.Item + item := task.TableItemID oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - return result, nil + return nil } wrapper := &statsWrapper{} if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - return result, nil + return nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - return result, nil + return nil } wrapper.col = col } @@ -302,8 +317,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC needUpdate := false wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader) if err != nil { - result.Error = err - return result, err + return err } if item.IsIndex { if wrapper.idx != nil { @@ -315,10 +329,10 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - return result, nil + if needUpdate { + h.updateCachedItem(item, wrapper.col, wrapper.idx) } - return nil, nil + return nil } func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) { @@ -433,7 +447,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re } // drainColTask will hang until a column task can return, and either task or error will be returned. -func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) { +func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh for { select { @@ -446,7 +460,6 @@ func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*Nee // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) continue } @@ -515,12 +528,12 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - return true + return false } if !item.IsIndex && colHist != nil { c, ok := tbl.Columns[item.ID] if !ok || c.IsFullLoad() { - return true + return false } tbl = tbl.Copy() tbl.Columns[c.ID] = colHist diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index 6c45904aa8635..2167275cdf9c2 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" @@ -206,7 +205,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { exitCh := make(chan struct{}) require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms)) - task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.Error(t, err1) require.NotNil(t, task1) for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { @@ -227,7 +226,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { } require.NoError(t, failpoint.Disable(fp.failPath)) - task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err3) require.Nil(t, task3) @@ -250,3 +249,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + h := dom.StatsHandle() + + neededColumns := make([]model.TableItemID, 1) + neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false} + timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := &stmtctx.StatementContext{} + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := &stmtctx.StatementContext{} + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *handle.NeededItemTask + err1 error + ) + readerCtx := &handle.StatsReaderContext{} + for i := 0; i < handle.RetryCount; i++ { + task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + } + result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + require.NoError(t, err1) + require.Nil(t, result) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail")) +} From 22412254ac2c5953c9bf5ad2afda0fb33525adf5 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 29 Aug 2024 14:11:54 +0800 Subject: [PATCH 2/2] update Signed-off-by: Weizhen Wang --- statistics/handle/handle_hist_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index 2167275cdf9c2..6189281d99c42 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -318,11 +318,11 @@ func TestRetry(t *testing.T) { result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err1) require.Nil(t, result) - select { - case <-task1.ResultCh: - default: - t.Logf("task1.ResultCh should get nothing") - t.FailNow() + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + rs1, ok1 := <-resultCh + require.True(t, rs1.Shared) + require.True(t, ok1) + require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error) } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail")) }