diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index 09c16c2b3c00a..21d6048beaaa1 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -36,7 +36,7 @@ go_test( srcs = ["stats_syncload_test.go"], flaky = True, race = "on", - shard_count = 6, + shard_count = 7, deps = [ ":syncload", "//pkg/config", diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index cbcf22b46f4ab..42b9f01a3a5bb 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -44,7 +44,7 @@ import ( ) // RetryCount is the max retry count for a sync load task. -const RetryCount = 3 +const RetryCount = 2 // GetSyncLoadConcurrencyByCPU returns the concurrency of sync load by CPU. func GetSyncLoadConcurrencyByCPU() int { @@ -114,9 +114,13 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis select { case s.StatsLoad.NeededItemsCh <- task: metrics.SyncLoadDedupCounter.Inc() - result, ok := <-task.ResultCh - intest.Assert(ok, "task.ResultCh cannot be closed") - return result, nil + select { + case <-timer.C: + return nil, errors.New("sync load took too long to return") + case result, ok := <-task.ResultCh: + intest.Assert(ok, "task.ResultCh cannot be closed") + return result, nil + } case <-timer.C: return nil, errors.New("sync load stats channel is full and timeout sending task to channel") } diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 43c5dfd815ad5..8d6a6f581fa8d 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -365,3 +365,47 @@ func TestRetry(t *testing.T) { } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) } + +func TestSendLoadRequestsWaitTooLong(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel + newConfig.Performance.StatsLoadQueueSize = 10000 + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b,c))") + tk.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + tk.MustExec("analyze table t all columns") + h := dom.StatsHandle() + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + neededColumns := make([]model.StatsLoadItem, 0, len(tableInfo.Columns)) + for _, col := range tableInfo.Columns { + neededColumns = append(neededColumns, model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: col.ID, IsIndex: false}, FullLoad: true}) + } + stmtCtx := stmtctx.NewStmtCtx() + timeout := time.Nanosecond * 100 + require.NoError(t, h.SendLoadRequests(stmtCtx, neededColumns, timeout)) + for _, resultCh := range stmtCtx.StatsLoad.ResultCh { + rs1 := <-resultCh + require.Error(t, rs1.Err) + } + stmtCtx1 := stmtctx.NewStmtCtx() + require.NoError(t, h.SendLoadRequests(stmtCtx1, neededColumns, timeout)) + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + rs1 := <-resultCh + require.Error(t, rs1.Err) + } +}