Skip to content

Commit

Permalink
statistics: fix wrong singleflight implementation for stats' syncload
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Apr 2, 2024
1 parent 3cfea6a commit ac22710
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 50 deletions.
4 changes: 4 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,10 @@ type TableItemID struct {
IsSyncLoadFailed bool
}

func (t TableItemID) Key() string {
return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed)
}

// StatsLoadItem represents the load unit for statistics's memory loading.
type StatsLoadItem struct {
TableItemID
Expand Down
74 changes: 26 additions & 48 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo
s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
//s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
return s
}

Expand Down Expand Up @@ -231,24 +231,39 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
return s.handleOneItemTask(sctx, task)
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (interface{}, error) {
return s.handleOneItemTask(sctx, task)
})
timeout := task.ToTimeout.Sub(time.Now())
select {
case result := <-resultChan:
if result.Err != nil {
slr := *(result.Val.(*stmtctx.StatsLoadResult))
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- slr
return nil, nil
}
return task, result.Err
case <-time.After(timeout):
return task, nil
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*statstypes.NeededItemTask, error) {
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*stmtctx.StatsLoadResult, error) {
result := &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -258,27 +273,20 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
if col != nil {
wrapper.colInfo = col.Info
} else {
wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID)
}
}
// to avoid duplicated handling in concurrent scenario
working := s.setWorking(result.Item, task.ResultCh)
if !working {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return task, err
return result, err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -291,9 +299,8 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
s.writeToResultChan(task.ResultCh, result)
return result, nil
}
s.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -509,32 +516,3 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis
s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
return true
}

func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
chList, ok := s.StatsLoad.WorkingColMap[item]
if ok {
if chList[0] == resultCh {
return true // just return for duplicate setWorking
}
s.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
s.StatsLoad.WorkingColMap[item] = chList
return true
}

func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
s.writeToResultChan(ch, result)
}
}
delete(s.StatsLoad.WorkingColMap, result.Item)
}
6 changes: 4 additions & 2 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"golang.org/x/sync/singleflight"
)

// StatsGC is used to GC unnecessary stats.
Expand Down Expand Up @@ -375,8 +376,9 @@ type NeededItemTask struct {
type StatsLoad struct {
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
SubCtxs []sessionctx.Context

Singleflight singleflight.Group
SubCtxs []sessionctx.Context
sync.Mutex
}

Expand Down

0 comments on commit ac22710

Please sign in to comment.