Skip to content

Commit

Permalink
statistics: add upper bound of retry for sync load (pingcap#52658) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and hawkingrei committed Jul 1, 2024
1 parent ceff3eb commit 30911a2
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 22 deletions.
58 changes: 36 additions & 22 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
"golang.org/x/sync/singleflight"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand All @@ -55,6 +58,7 @@ type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Retry int
}

// SendLoadRequests send neededColumns requests
Expand Down Expand Up @@ -213,6 +217,9 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
Expand All @@ -232,52 +239,60 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(task, readerCtx, ctx)
err := h.handleOneItemTask(task, readerCtx, ctx)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
return task, nil
}
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) {
func isVaildForRetry(task *NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
item := task.TableItemID
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
return result, nil
return nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
return result, nil
return nil
}
wrapper.col = col
}
Expand All @@ -287,8 +302,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idx != nil {
Expand All @@ -300,10 +314,10 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
return result, nil
if needUpdate {
h.updateCachedItem(item, wrapper.col, wrapper.idx)
}
return nil, nil
return nil
}

func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) {
Expand Down Expand Up @@ -487,12 +501,12 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
if !ok || c.IsFullLoad() {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
Expand Down
90 changes: 90 additions & 0 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Error(t, err1)
require.NotNil(t, task1)

select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
default:
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
Expand All @@ -231,3 +244,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.TableItemID, 1)
neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail", "return(true)"))
var (
task1 *handle.NeededItemTask
err1 error
)
readerCtx := &handle.StatsReaderContext{}
for i := 0; i < handle.RetryCount; i++ {
task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail"))
}

0 comments on commit 30911a2

Please sign in to comment.