From ac22710cb0a3da76508c1c0f57c14a49d3e45aae Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 10:58:10 +0800 Subject: [PATCH] statistics: fix wrong singleflight implementation for stats' syncload Signed-off-by: Weizhen Wang --- pkg/parser/model/model.go | 4 + .../handle/syncload/stats_syncload.go | 74 +++++++------------ pkg/statistics/handle/types/interfaces.go | 6 +- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index 12aedb0671755..40d22dc818cb9 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1786,6 +1786,10 @@ type TableItemID struct { IsSyncLoadFailed bool } +func (t TableItemID) Key() string { + return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed) +} + // StatsLoadItem represents the load unit for statistics's memory loading. type StatsLoadItem struct { TableItemID diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index dddbb6ad44c92..e25b617bcb1a1 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -48,7 +48,7 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} + //s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} return s } @@ -231,24 +231,39 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } else { task = lastTask } - return s.handleOneItemTask(sctx, task) + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (interface{}, error) { + return s.handleOneItemTask(sctx, task) + }) + timeout := task.ToTimeout.Sub(time.Now()) + select { + case result := <-resultChan: + if result.Err != nil { + slr := *(result.Val.(*stmtctx.StatsLoadResult)) + if slr.Error != nil { + return task, slr.Error + } + task.ResultCh <- slr + return nil, nil + } + return task, result.Err + case <-time.After(timeout): + return task, nil + } } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*statstypes.NeededItemTask, error) { - result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*stmtctx.StatsLoadResult, error) { + result := &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} item := result.Item tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } var err error wrapper := &statsWrapper{} if item.IsIndex { index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) if !loadNeeded { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } if index != nil { wrapper.idxInfo = index.Info @@ -258,8 +273,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } else { col, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) if !loadNeeded { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } if col != nil { wrapper.colInfo = col.Info @@ -267,18 +281,12 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) } } - // to avoid duplicated handling in concurrent scenario - working := s.setWorking(result.Item, task.ResultCh) - if !working { - s.writeToResultChan(task.ResultCh, result) - return nil, nil - } t := time.Now() needUpdate := false wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) if err != nil { result.Error = err - return task, err + return result, err } if item.IsIndex { if wrapper.idxInfo != nil { @@ -291,9 +299,8 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - s.writeToResultChan(task.ResultCh, result) + return result, nil } - s.finishWorking(result) return nil, nil } @@ -509,32 +516,3 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) return true } - -func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { - s.StatsLoad.Lock() - defer s.StatsLoad.Unlock() - chList, ok := s.StatsLoad.WorkingColMap[item] - if ok { - if chList[0] == resultCh { - return true // just return for duplicate setWorking - } - s.StatsLoad.WorkingColMap[item] = append(chList, resultCh) - return false - } - chList = []chan stmtctx.StatsLoadResult{} - chList = append(chList, resultCh) - s.StatsLoad.WorkingColMap[item] = chList - return true -} - -func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) { - s.StatsLoad.Lock() - defer s.StatsLoad.Unlock() - if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok { - list := chList[1:] - for _, ch := range list { - s.writeToResultChan(ch, result) - } - } - delete(s.StatsLoad.WorkingColMap, result.Item) -} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 031760fca7ce9..93ccdbb42d1c3 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/sqlexec" + "golang.org/x/sync/singleflight" ) // StatsGC is used to GC unnecessary stats. @@ -375,8 +376,9 @@ type NeededItemTask struct { type StatsLoad struct { NeededItemsCh chan *NeededItemTask TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult - SubCtxs []sessionctx.Context + + Singleflight singleflight.Group + SubCtxs []sessionctx.Context sync.Mutex }