From 9a8df941e88f1f05289c4013cc489e2b46a86910 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 6 Jun 2024 10:50:56 +0800 Subject: [PATCH] statistics: support global singleflight for sync load (#52796) (#53838) close pingcap/tidb#52797 --- sessionctx/stmtctx/BUILD.bazel | 1 + sessionctx/stmtctx/stmtctx.go | 3 +- statistics/handle/handle_hist.go | 102 +++++++++++++------------- statistics/handle/handle_hist_test.go | 56 ++++++++------ 4 files changed, 87 insertions(+), 75 deletions(-) diff --git a/sessionctx/stmtctx/BUILD.bazel b/sessionctx/stmtctx/BUILD.bazel index 65fcbb08fc697..f21ceca9c25ea 100644 --- a/sessionctx/stmtctx/BUILD.bazel +++ b/sessionctx/stmtctx/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", + "@org_golang_x_sync//singleflight", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index d8e0bc7cd5fb7..244a877a6bda2 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -39,6 +39,7 @@ import ( "github.com/tikv/client-go/v2/util" atomic2 "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/singleflight" ) const ( @@ -330,7 +331,7 @@ type StatementContext struct { // NeededItems stores the columns/indices whose stats are needed for planner. NeededItems []model.TableItemID // ResultCh to receive stats loading results - ResultCh chan StatsLoadResult + ResultCh []<-chan singleflight.Result // LoadStartTime is to record the load start time to calculate latency LoadStartTime time.Time } diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index a06363466acbd..5f9d7b26ff1f0 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -39,6 +39,8 @@ import ( // RetryCount is the max retry count for a sync load task. const RetryCount = 3 +var globalStatsSyncLoadSingleFlight singleflight.Group + type statsWrapper struct { col *statistics.Column idx *statistics.Index @@ -79,25 +81,26 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems } sc.StatsLoad.Timeout = timeout sc.StatsLoad.NeededItems = remainedItems - sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) - tasks := make([]*NeededItemTask, 0) + sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems)) for _, item := range remainedItems { - task := &NeededItemTask{ - TableItemID: item, - ToTimeout: time.Now().Local().Add(timeout), - ResultCh: sc.StatsLoad.ResultCh, - } - tasks = append(tasks, task) - } - timer := time.NewTimer(timeout) - defer timer.Stop() - for _, task := range tasks { - select { - case h.StatsLoad.NeededItemsCh <- task: - continue - case <-timer.C: - return errors.New("sync load stats channel is full and timeout sending task to channel") - } + localItem := item + resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + task := &NeededItemTask{ + TableItemID: localItem, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: make(chan stmtctx.StatsLoadResult, 1), + } + select { + case h.StatsLoad.NeededItemsCh <- task: + result := <-task.ResultCh + return result, nil + case <-timer.C: + return nil, errors.New("sync load stats channel is full and timeout sending task to channel") + } + }) + sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh) } sc.StatsLoad.LoadStartTime = time.Now() return nil @@ -123,26 +126,34 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { metrics.SyncLoadCounter.Inc() timer := time.NewTimer(sc.StatsLoad.Timeout) defer timer.Stop() - for { + for _, resultCh := range sc.StatsLoad.ResultCh { select { - case result, ok := <-sc.StatsLoad.ResultCh: - if ok { - if result.HasError() { - errorMsgs = append(errorMsgs, result.ErrorMsg()) - } - delete(resultCheckMap, result.Item) - if len(resultCheckMap) == 0 { - metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) - return nil - } - } else { + case result, ok := <-resultCh: + if !ok { return errors.New("sync load stats channel closed unexpectedly") } + // this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker, + // not the stats loading error + if result.Err != nil { + errorMsgs = append(errorMsgs, result.Err.Error()) + } else { + val := result.Val.(stmtctx.StatsLoadResult) + // this error is from the stats loading error + if val.HasError() { + errorMsgs = append(errorMsgs, val.ErrorMsg()) + } + delete(resultCheckMap, val.Item) + } case <-timer.C: metrics.SyncLoadTimeoutCounter.Inc() return errors.New("sync load stats timeout") } } + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + return nil } // removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. @@ -240,28 +251,17 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC task = lastTask } result := stmtctx.StatsLoadResult{Item: task.TableItemID} - resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) { - err := h.handleOneItemTask(task, readerCtx, ctx) - return nil, err - }) - timeout := time.Until(task.ToTimeout) - select { - case sr := <-resultChan: - // sr.Val is always nil. - if sr.Err == nil { - task.ResultCh <- result - return nil, nil - } - if !isVaildForRetry(task) { - result.Error = sr.Err - task.ResultCh <- result - return nil, nil - } - return task, sr.Err - case <-time.After(timeout): - task.ToTimeout.Add(time.Duration(h.mu.ctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) - return task, nil + err = h.handleOneItemTask(task, readerCtx, ctx) + if err == nil { + task.ResultCh <- result + return nil, nil + } + if !isVaildForRetry(task) { + result.Error = err + task.ResultCh <- result + return nil, nil } + return task, err } func isVaildForRetry(task *NeededItemTask) bool { diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index d39f110852ff4..78009e3c9746b 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -208,14 +208,23 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.Error(t, err1) require.NotNil(t, task1) - + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + select { + case <-resultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + default: + } + } + for _, resultCh := range stmtCtx2.StatsLoad.ResultCh { + select { + case <-resultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + default: + } + } select { - case <-stmtCtx1.StatsLoad.ResultCh: - t.Logf("stmtCtx1.ResultCh should not get anything") - t.FailNow() - case <-stmtCtx2.StatsLoad.ResultCh: - t.Logf("stmtCtx2.ResultCh should not get anything") - t.FailNow() case <-task1.ResultCh: t.Logf("task1.ResultCh should not get anything") t.FailNow() @@ -226,17 +235,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err3) require.Nil(t, task3) - - task, err3 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) - require.NoError(t, err3) - require.Nil(t, task) - - rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh - require.True(t, ok1) - require.Equal(t, neededColumns[0], rs1.Item) - rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh - require.True(t, ok2) - require.Equal(t, neededColumns[0], rs2.Item) + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + rs1, ok1 := <-resultCh + require.True(t, rs1.Shared) + require.True(t, ok1) + require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID) + } + for _, resultCh := range stmtCtx2.StatsLoad.ResultCh { + rs1, ok1 := <-resultCh + require.True(t, rs1.Shared) + require.True(t, ok1) + require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID) + } stat = h.GetTableStats(tableInfo) hg = stat.Columns[tableInfo.Columns[2].ID].Histogram @@ -313,11 +323,11 @@ func TestRetry(t *testing.T) { result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err1) require.Nil(t, result) - select { - case <-task1.ResultCh: - default: - t.Logf("task1.ResultCh should get nothing") - t.FailNow() + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + rs1, ok1 := <-resultCh + require.True(t, rs1.Shared) + require.True(t, ok1) + require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error) } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail")) }