Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: fix wrong singleflight implementation for stats' syncload #52301

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,12 +1786,22 @@ type TableItemID struct {
IsSyncLoadFailed bool
}

// Key is used to generate unique key for TableItemID to use in the syncload
func (t TableItemID) Key() string {
return fmt.Sprintf("%d#%d#%t", t.ID, t.TableID, t.IsIndex)
}

// StatsLoadItem represents the load unit for statistics's memory loading.
type StatsLoadItem struct {
TableItemID
FullLoad bool
}

// Key is used to generate unique key for TableItemID to use in the syncload
func (s StatsLoadItem) Key() string {
return fmt.Sprintf("%s#%t", s.TableItemID.Key(), s.FullLoad)
}

// PolicyRefInfo is the struct to refer the placement policy.
type PolicyRefInfo struct {
ID int64 `json:"id"`
Expand Down
81 changes: 32 additions & 49 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo
s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
return s
}

Expand Down Expand Up @@ -231,24 +230,45 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
return s.handleOneItemTask(sctx, task)
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
return s.handleOneItemTask(sctx, task)
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
return nil, nil
}
return task, result.Err
case <-time.After(timeout):
return task, nil
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*statstypes.NeededItemTask, error) {
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -258,27 +278,20 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
if col != nil {
wrapper.colInfo = col.Info
} else {
wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID)
}
}
// to avoid duplicated handling in concurrent scenario
working := s.setWorking(result.Item, task.ResultCh)
if !working {
s.writeToResultChan(task.ResultCh, result)
return nil, nil
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return task, err
return result, err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -291,9 +304,8 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
s.writeToResultChan(task.ResultCh, result)
return result, nil
}
s.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -509,32 +521,3 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis
s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
return true
}

func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
chList, ok := s.StatsLoad.WorkingColMap[item]
if ok {
if chList[0] == resultCh {
return true // just return for duplicate setWorking
}
s.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
s.StatsLoad.WorkingColMap[item] = chList
return true
}

func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
s.writeToResultChan(ch, result)
}
}
delete(s.StatsLoad.WorkingColMap, result.Item)
}
9 changes: 3 additions & 6 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,14 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Error(t, err1)
require.NotNil(t, task1)

task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Nil(t, err2)
require.Nil(t, task2)

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

require.Len(t, stmtCtx1.StatsLoad.ResultCh, 1)
require.Len(t, stmtCtx2.StatsLoad.ResultCh, 1)
task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
"//pkg/types",
"//pkg/util",
"//pkg/util/sqlexec",
"@org_golang_x_sync//singleflight",
],
)
24 changes: 23 additions & 1 deletion pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"golang.org/x/sync/singleflight"
)

// StatsGC is used to GC unnecessary stats.
Expand Down Expand Up @@ -372,10 +373,31 @@ type NeededItemTask struct {
}

// StatsLoad is used to load stats concurrently
// TODO(hawkingrei): Our implementation of loading statistics is flawed.
// Currently, we enqueue tasks that require loading statistics into a channel,
// from which workers retrieve tasks to process. Then, using the singleflight mechanism,
// we filter out duplicate tasks. However, the issue with this approach is that it does
// not filter out all duplicate tasks, but only the duplicates within the number of workers.
// Such an implementation is not reasonable.
//
// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats.
//
// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐
// │ │
// │ singleflight │
// │ │
// └───────────────────────────────────────────────────────────────────────────────────────┘
//
// │ │
// ┌────────────▼──────┐ ┌───────▼───────────┐
// │ │ │ │
// │ syncload worker │ │ syncload worker │
// │ │ │ │
// └───────────────────┘ └───────────────────┘
type StatsLoad struct {
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
Singleflight singleflight.Group
SubCtxs []sessionctx.Context
sync.Mutex
}
Expand Down