Skip to content

Commit

Permalink
statistics: add upper bound of retry for sync load (#52658) (#52818)
Browse files Browse the repository at this point in the history
close #52657
  • Loading branch information
ti-chi-bot authored May 16, 2024
1 parent ce9e337 commit d9788b9
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ go_test(
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 9,
deps = [
"//pkg/config",
"//pkg/parser/model",
Expand Down
40 changes: 30 additions & 10 deletions pkg/statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
"golang.org/x/sync/singleflight"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand All @@ -57,6 +60,7 @@ type NeededItemTask struct {
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
TableItemID model.TableItemID
Retry int
}

// SendLoadRequests send neededColumns requests
Expand Down Expand Up @@ -203,6 +207,9 @@ func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exit
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
Expand All @@ -222,27 +229,40 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(sctx, task)
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
if !isVaildForRetry(task) {
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}

func isVaildForRetry(task *NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
defer func() {
// recover for each task, worker keeps working
Expand All @@ -251,8 +271,8 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item

item := task.TableItemID
tbl, ok := h.Get(item.TableID)
if !ok {
return result, nil
Expand Down
89 changes: 89 additions & 0 deletions pkg/statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -206,6 +207,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
default:
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
Expand All @@ -229,3 +242,79 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()
neededColumns := make([]model.TableItemID, 1)
neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOneFail", "return(true)"))
var (
task1 *handle.NeededItemTask
err1 error
)

for i := 0; i < handle.RetryCount; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOneFail"))
}

0 comments on commit d9788b9

Please sign in to comment.