Skip to content

Commit

Permalink
statistics: add upper bound of retry for sync load (#52658)
Browse files Browse the repository at this point in the history
close #52657
  • Loading branch information
hawkingrei authored Apr 22, 2024
1 parent 4b6e8ee commit cfcc770
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 4,
shard_count = 5,
deps = [
":syncload",
"//pkg/config",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/statistics/handle/types",
"//pkg/testkit",
"//pkg/util/mathutil",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
68 changes: 42 additions & 26 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
"go.uber.org/zap"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

type statsSyncLoad struct {
statsHandle statstypes.StatsHandle
StatsLoad statstypes.StatsLoad
Expand Down Expand Up @@ -204,6 +207,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
Expand All @@ -223,46 +229,59 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
return s.handleOneItemTask(sctx, task)
err := s.handleOneItemTask(sctx, task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
if !isVaildForRetry(task) {
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
func isVaildForRetry(task *statstypes.NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return result, nil
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -272,7 +291,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return result, nil
return nil
}
if col != nil {
wrapper.colInfo = col.Info
Expand All @@ -288,18 +307,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0),
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()),
}
if s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
}
return nil, nil
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return nil
}
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -311,10 +327,10 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
if needUpdate {
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
}
return nil, nil
return nil
}

// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
Expand Down Expand Up @@ -492,14 +508,14 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
// - If the stats is fully loaded,
// - If the stats is meta-loaded and we also just need the meta.
if ok && (c.IsFullLoad() || !fullLoaded) {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[item.ID] = colHist
Expand Down
91 changes: 91 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics/handle/syncload"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -206,6 +208,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
default:
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
Expand All @@ -229,3 +243,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.StatsLoadItem, 1)
neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)"))
var (
task1 *types.NeededItemTask
err1 error
)

for i := 0; i < syncload.RetryCount; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
}
1 change: 1 addition & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type NeededItemTask struct {
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Item model.StatsLoadItem
Retry int
}

// StatsLoad is used to load stats concurrently
Expand Down

0 comments on commit cfcc770

Please sign in to comment.