Skip to content

Commit

Permalink
statistics: rightly deal with timout when to send sync load (#57712)
Browse files Browse the repository at this point in the history
close #57710
  • Loading branch information
hawkingrei authored Nov 27, 2024
1 parent 9cc4a20 commit d0de86b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
":syncload",
"//pkg/config",
Expand Down
12 changes: 8 additions & 4 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3
const RetryCount = 2

// GetSyncLoadConcurrencyByCPU returns the concurrency of sync load by CPU.
func GetSyncLoadConcurrencyByCPU() int {
Expand Down Expand Up @@ -114,9 +114,13 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis
select {
case s.StatsLoad.NeededItemsCh <- task:
metrics.SyncLoadDedupCounter.Inc()
result, ok := <-task.ResultCh
intest.Assert(ok, "task.ResultCh cannot be closed")
return result, nil
select {
case <-timer.C:
return nil, errors.New("sync load took too long to return")
case result, ok := <-task.ResultCh:
intest.Assert(ok, "task.ResultCh cannot be closed")
return result, nil
}
case <-timer.C:
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,47 @@ func TestRetry(t *testing.T) {
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
}

func TestSendLoadRequestsWaitTooLong(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = -1 // no worker to consume channel
newConfig.Performance.StatsLoadQueueSize = 10000
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b,c))")
tk.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
tk.MustExec("analyze table t all columns")
h := dom.StatsHandle()
is := dom.InfoSchema()
tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()
neededColumns := make([]model.StatsLoadItem, 0, len(tableInfo.Columns))
for _, col := range tableInfo.Columns {
neededColumns = append(neededColumns, model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: col.ID, IsIndex: false}, FullLoad: true})
}
stmtCtx := stmtctx.NewStmtCtx()
timeout := time.Nanosecond * 100
require.NoError(t, h.SendLoadRequests(stmtCtx, neededColumns, timeout))
for _, resultCh := range stmtCtx.StatsLoad.ResultCh {
rs1 := <-resultCh
require.Error(t, rs1.Err)
}
stmtCtx1 := stmtctx.NewStmtCtx()
require.NoError(t, h.SendLoadRequests(stmtCtx1, neededColumns, timeout))
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1 := <-resultCh
require.Error(t, rs1.Err)
}
}

0 comments on commit d0de86b

Please sign in to comment.