Skip to content

Commit

Permalink
statistics: fix wrong singleflight implementation for stats' syncload
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Apr 2, 2024
1 parent ac22710 commit 92241bd
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,7 @@ type TableItemID struct {
IsSyncLoadFailed bool
}

// Key is used to generate unique key for TableItemID to use in the syncload
func (t TableItemID) Key() string {
return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed)
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo
s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
//s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
return s
}

Expand Down Expand Up @@ -231,13 +230,13 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (interface{}, error) {
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
return s.handleOneItemTask(sctx, task)
})
timeout := task.ToTimeout.Sub(time.Now())
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err != nil {
if result.Err == nil {
slr := *(result.Val.(*stmtctx.StatsLoadResult))
if slr.Error != nil {
return task, slr.Error
Expand All @@ -251,14 +250,20 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*stmtctx.StatsLoadResult, error) {
result := &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return result, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
Expand Down
9 changes: 3 additions & 6 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,14 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Error(t, err1)
require.NotNil(t, task1)

task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Nil(t, err2)
require.Nil(t, task2)

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

require.Len(t, stmtCtx1.StatsLoad.ResultCh, 1)
require.Len(t, stmtCtx2.StatsLoad.ResultCh, 1)
task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
"//pkg/types",
"//pkg/util",
"//pkg/util/sqlexec",
"@org_golang_x_sync//singleflight",
],
)

0 comments on commit 92241bd

Please sign in to comment.