Skip to content

Commit

Permalink
statistics: add upper bound of retry for sync load (#55742)
Browse files Browse the repository at this point in the history
close #52657
  • Loading branch information
hawkingrei authored Aug 29, 2024
1 parent 8369cf5 commit e011cdd
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 32 deletions.
3 changes: 1 addition & 2 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ go_test(
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
"//config",
"//domain",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
Expand Down
67 changes: 40 additions & 27 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import (
"golang.org/x/sync/singleflight"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

var globalStatsSyncLoadSingleFlight singleflight.Group

type statsWrapper struct {
Expand All @@ -58,6 +61,7 @@ type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Retry int
}

// SendLoadRequests send neededColumns requests
Expand Down Expand Up @@ -210,7 +214,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
var lastTask *NeededItemTask
for {
task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
lastTask = task
if err != nil {
switch err {
Expand All @@ -228,7 +232,10 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
Expand All @@ -237,7 +244,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
}
}()
if lastTask == nil {
task, err = h.drainColTask(sctx, exit)
task, err = h.drainColTask(exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand All @@ -247,52 +254,60 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(task, readerCtx, ctx)
err := h.handleOneItemTask(task, readerCtx, ctx)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
return task, nil
}
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) {
func isVaildForRetry(task *NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
item := task.TableItemID
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
return result, nil
return nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
return result, nil
return nil
}
wrapper.col = col
}
Expand All @@ -302,8 +317,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idx != nil {
Expand All @@ -315,10 +329,10 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
return result, nil
if needUpdate {
h.updateCachedItem(item, wrapper.col, wrapper.idx)
}
return nil, nil
return nil
}

func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) {
Expand Down Expand Up @@ -433,7 +447,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) {
func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -446,7 +460,6 @@ func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*Nee
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down Expand Up @@ -515,12 +528,12 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
if !ok || c.IsFullLoad() {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
Expand Down
82 changes: 79 additions & 3 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -206,7 +205,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms))

task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
Expand All @@ -227,7 +226,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

Expand All @@ -250,3 +249,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.TableItemID, 1)
neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail", "return(true)"))
var (
task1 *handle.NeededItemTask
err1 error
)
readerCtx := &handle.StatsReaderContext{}
for i := 0; i < handle.RetryCount; i++ {
task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err1)
require.Nil(t, result)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error)
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail"))
}

0 comments on commit e011cdd

Please sign in to comment.