From ac22710cb0a3da76508c1c0f57c14a49d3e45aae Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 10:58:10 +0800 Subject: [PATCH 1/6] statistics: fix wrong singleflight implementation for stats' syncload Signed-off-by: Weizhen Wang --- pkg/parser/model/model.go | 4 + .../handle/syncload/stats_syncload.go | 74 +++++++------------ pkg/statistics/handle/types/interfaces.go | 6 +- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index 12aedb0671755..40d22dc818cb9 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1786,6 +1786,10 @@ type TableItemID struct { IsSyncLoadFailed bool } +func (t TableItemID) Key() string { + return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed) +} + // StatsLoadItem represents the load unit for statistics's memory loading. type StatsLoadItem struct { TableItemID diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index dddbb6ad44c92..e25b617bcb1a1 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -48,7 +48,7 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} + //s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} return s } @@ -231,24 +231,39 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } else { task = lastTask } - return s.handleOneItemTask(sctx, task) + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (interface{}, error) { + return s.handleOneItemTask(sctx, task) + }) + timeout := task.ToTimeout.Sub(time.Now()) + select { + case result := <-resultChan: + if result.Err != nil { + slr := *(result.Val.(*stmtctx.StatsLoadResult)) + if slr.Error != nil { + return task, slr.Error + } + task.ResultCh <- slr + return nil, nil + } + return task, result.Err + case <-time.After(timeout): + return task, nil + } } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*statstypes.NeededItemTask, error) { - result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*stmtctx.StatsLoadResult, error) { + result := &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} item := result.Item tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } var err error wrapper := &statsWrapper{} if item.IsIndex { index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) if !loadNeeded { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } if index != nil { wrapper.idxInfo = index.Info @@ -258,8 +273,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } else { col, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) if !loadNeeded { - s.writeToResultChan(task.ResultCh, result) - return nil, nil + return result, nil } if col != nil { wrapper.colInfo = col.Info @@ -267,18 +281,12 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) } } - // to avoid duplicated handling in concurrent scenario - working := s.setWorking(result.Item, task.ResultCh) - if !working { - s.writeToResultChan(task.ResultCh, result) - return nil, nil - } t := time.Now() needUpdate := false wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) if err != nil { result.Error = err - return task, err + return result, err } if item.IsIndex { if wrapper.idxInfo != nil { @@ -291,9 +299,8 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - s.writeToResultChan(task.ResultCh, result) + return result, nil } - s.finishWorking(result) return nil, nil } @@ -509,32 +516,3 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) return true } - -func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { - s.StatsLoad.Lock() - defer s.StatsLoad.Unlock() - chList, ok := s.StatsLoad.WorkingColMap[item] - if ok { - if chList[0] == resultCh { - return true // just return for duplicate setWorking - } - s.StatsLoad.WorkingColMap[item] = append(chList, resultCh) - return false - } - chList = []chan stmtctx.StatsLoadResult{} - chList = append(chList, resultCh) - s.StatsLoad.WorkingColMap[item] = chList - return true -} - -func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) { - s.StatsLoad.Lock() - defer s.StatsLoad.Unlock() - if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok { - list := chList[1:] - for _, ch := range list { - s.writeToResultChan(ch, result) - } - } - delete(s.StatsLoad.WorkingColMap, result.Item) -} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 031760fca7ce9..93ccdbb42d1c3 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/sqlexec" + "golang.org/x/sync/singleflight" ) // StatsGC is used to GC unnecessary stats. @@ -375,8 +376,9 @@ type NeededItemTask struct { type StatsLoad struct { NeededItemsCh chan *NeededItemTask TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult - SubCtxs []sessionctx.Context + + Singleflight singleflight.Group + SubCtxs []sessionctx.Context sync.Mutex } From 92241bd9f97002ea80de86f5e4eb452e3dc2c506 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 11:06:02 +0800 Subject: [PATCH 2/6] statistics: fix wrong singleflight implementation for stats' syncload Signed-off-by: Weizhen Wang --- pkg/parser/model/model.go | 1 + .../handle/syncload/stats_syncload.go | 19 ++++++++++++------- .../handle/syncload/stats_syncload_test.go | 9 +++------ pkg/statistics/handle/types/BUILD.bazel | 1 + 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index 40d22dc818cb9..3ed3385066ad5 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1786,6 +1786,7 @@ type TableItemID struct { IsSyncLoadFailed bool } +// Key is used to generate unique key for TableItemID to use in the syncload func (t TableItemID) Key() string { return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed) } diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index e25b617bcb1a1..b24f48c870597 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -48,7 +48,6 @@ func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLo s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - //s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} return s } @@ -231,13 +230,13 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } else { task = lastTask } - resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (interface{}, error) { + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { return s.handleOneItemTask(sctx, task) }) - timeout := task.ToTimeout.Sub(time.Now()) + timeout := time.Until(task.ToTimeout) select { case result := <-resultChan: - if result.Err != nil { + if result.Err == nil { slr := *(result.Val.(*stmtctx.StatsLoadResult)) if slr.Error != nil { return task, slr.Error @@ -251,14 +250,20 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (*stmtctx.StatsLoadResult, error) { - result := &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} item := result.Item tbl, ok := s.statsHandle.Get(item.TableID) if !ok { return result, nil } - var err error wrapper := &statsWrapper{} if item.IsIndex { index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 2c2768bc4f849..01706d668d125 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -207,17 +207,14 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Error(t, err1) require.NotNil(t, task1) - task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) - require.Nil(t, err2) - require.Nil(t, task2) - require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.NoError(t, err3) require.Nil(t, task3) - require.Len(t, stmtCtx1.StatsLoad.ResultCh, 1) - require.Len(t, stmtCtx2.StatsLoad.ResultCh, 1) + task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) + require.NoError(t, err3) + require.Nil(t, task) rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh require.True(t, ok1) diff --git a/pkg/statistics/handle/types/BUILD.bazel b/pkg/statistics/handle/types/BUILD.bazel index df7a6ea2acfa1..328d1a75b1159 100644 --- a/pkg/statistics/handle/types/BUILD.bazel +++ b/pkg/statistics/handle/types/BUILD.bazel @@ -17,5 +17,6 @@ go_library( "//pkg/types", "//pkg/util", "//pkg/util/sqlexec", + "@org_golang_x_sync//singleflight", ], ) From 5e2b69f593a4fe2751ff15f8540b1d68e25ea9f2 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 15:50:22 +0800 Subject: [PATCH 3/6] 52294 --- pkg/statistics/handle/types/interfaces.go | 26 ++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 93ccdbb42d1c3..5bffd2a196327 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -373,12 +373,32 @@ type NeededItemTask struct { } // StatsLoad is used to load stats concurrently +// TODO(hawkingrei): Our implementation of loading statistics is flawed. +// Currently, we enqueue tasks that require loading statistics into a channel, +// from which workers retrieve tasks to process. Then, using the singleflight mechanism, +// we filter out duplicate tasks. However, the issue with this approach is that it does +// not filter out all duplicate tasks, but only the duplicates within the number of workers. +// Such an implementation is not reasonable. +// +// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats. +// +// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐ +// │ │ +// │ singleflight │ +// │ │ +// └───────────────────────────────────────────────────────────────────────────────────────┘ +// +// │ │ +// ┌────────────▼──────┐ ┌───────▼───────────┐ +// │ │ │ │ +// │ syncload worker │ │ syncload worker │ +// │ │ │ │ +// └───────────────────┘ └───────────────────┘ type StatsLoad struct { NeededItemsCh chan *NeededItemTask TimeoutItemsCh chan *NeededItemTask - - Singleflight singleflight.Group - SubCtxs []sessionctx.Context + Singleflight singleflight.Group + SubCtxs []sessionctx.Context sync.Mutex } From ca20f6e99fdae537819f8ae2f4f038f54a4dfe28 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 17:49:43 +0800 Subject: [PATCH 4/6] 52294 --- pkg/parser/model/model.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index 3ed3385066ad5..be0f0c6a5cd76 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1788,7 +1788,7 @@ type TableItemID struct { // Key is used to generate unique key for TableItemID to use in the syncload func (t TableItemID) Key() string { - return fmt.Sprintf("%d%d%t%t", t.ID, t.TableID, t.IsIndex, t.IsSyncLoadFailed) + return fmt.Sprintf("%d#%d#%t", t.ID, t.TableID, t.IsIndex) } // StatsLoadItem represents the load unit for statistics's memory loading. From 214c35cf1410d372d99e416fd8acdd36811a8624 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 2 Apr 2024 18:00:05 +0800 Subject: [PATCH 5/6] 52294 --- pkg/parser/model/model.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/parser/model/model.go b/pkg/parser/model/model.go index be0f0c6a5cd76..88056ae9a40ae 100644 --- a/pkg/parser/model/model.go +++ b/pkg/parser/model/model.go @@ -1797,6 +1797,11 @@ type StatsLoadItem struct { FullLoad bool } +// Key is used to generate unique key for TableItemID to use in the syncload +func (s StatsLoadItem) Key() string { + return fmt.Sprintf("%s#%t", s.TableItemID.Key(), s.FullLoad) +} + // PolicyRefInfo is the struct to refer the placement policy. type PolicyRefInfo struct { ID int64 `json:"id"` From 2d9517c67089a525d360134aa477970be6de5b95 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 3 Apr 2024 17:05:02 +0800 Subject: [PATCH 6/6] Update pkg/statistics/handle/syncload/stats_syncload.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 二手掉包工程师 --- pkg/statistics/handle/syncload/stats_syncload.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index b24f48c870597..4befd5470f5be 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -237,11 +237,11 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty select { case result := <-resultChan: if result.Err == nil { - slr := *(result.Val.(*stmtctx.StatsLoadResult)) + slr := result.Val.(*stmtctx.StatsLoadResult) if slr.Error != nil { return task, slr.Error } - task.ResultCh <- slr + task.ResultCh <- *slr return nil, nil } return task, result.Err