From fc51cc73eb4c206708f6fda3f25ebfae3272a65d Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 19 Oct 2023 16:15:37 +0800 Subject: [PATCH] planner: move more methods from StatsHandle to its sub-packages (#47760) ref pingcap/tidb#46905 --- pkg/domain/domain.go | 2 +- .../core/casetest/planstats/BUILD.bazel | 2 +- .../casetest/planstats/plan_stats_test.go | 4 +- pkg/statistics/handle/BUILD.bazel | 13 +- pkg/statistics/handle/handle.go | 15 +- pkg/statistics/handle/syncload/BUILD.bazel | 44 ++++++ .../stats_syncload.go} | 149 +++++++++--------- .../stats_syncload_test.go} | 22 +-- pkg/statistics/handle/util/BUILD.bazel | 2 + pkg/statistics/handle/util/interfaces.go | 41 +++++ 10 files changed, 182 insertions(+), 112 deletions(-) create mode 100644 pkg/statistics/handle/syncload/BUILD.bazel rename pkg/statistics/handle/{handle_hist.go => syncload/stats_syncload.go} (72%) rename pkg/statistics/handle/{handle_hist_test.go => syncload/stats_syncload_test.go} (92%) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 46e777b6f941d..72e174fb0a77b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2268,7 +2268,7 @@ func quitStatsOwner(do *Domain, mgr owner.Manager) { func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { statsHandle := do.StatsHandle() for i, ctx := range ctxList { - statsHandle.StatsLoad.SubCtxs[i] = ctx + statsHandle.SetSubCtxs(i, ctx) do.wg.Add(1) go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } diff --git a/pkg/planner/core/casetest/planstats/BUILD.bazel b/pkg/planner/core/casetest/planstats/BUILD.bazel index 094d33c59f3ac..c789c86065f71 100644 --- a/pkg/planner/core/casetest/planstats/BUILD.bazel +++ b/pkg/planner/core/casetest/planstats/BUILD.bazel @@ -21,7 +21,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/statistics", - "//pkg/statistics/handle", + "//pkg/statistics/handle/util", "//pkg/table", "//pkg/testkit", "//pkg/testkit/testdata", diff --git a/pkg/planner/core/casetest/planstats/plan_stats_test.go b/pkg/planner/core/casetest/planstats/plan_stats_test.go index f29b42d2bfbf4..7be6b9eeb0c6a 100644 --- a/pkg/planner/core/casetest/planstats/plan_stats_test.go +++ b/pkg/planner/core/casetest/planstats/plan_stats_test.go @@ -32,7 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle" + utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" @@ -268,7 +268,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) { neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false} resultCh := make(chan stmtctx.StatsLoadResult, 1) timeout := time.Duration(1<<63 - 1) - task := &handle.NeededItemTask{ + task := &utilstats.NeededItemTask{ TableItemID: neededColumn, ResultCh: resultCh, ToTimeout: time.Now().Local().Add(timeout), diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 1a872a1b55589..229888fe212f1 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "bootstrap.go", "ddl.go", "handle.go", - "handle_hist.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle", visibility = ["//visibility:public"], @@ -15,7 +14,6 @@ go_library( "//pkg/ddl/util", "//pkg/infoschema", "//pkg/kv", - "//pkg/metrics", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", @@ -29,14 +27,13 @@ go_library( "//pkg/statistics/handle/history", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/syncload", "//pkg/statistics/handle/usage", "//pkg/statistics/handle/util", "//pkg/types", - "//pkg/util", "//pkg/util/chunk", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_tiancaiamao_gp//:gp", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", @@ -48,25 +45,19 @@ go_test( timeout = "short", srcs = [ "ddl_test.go", - "handle_hist_test.go", "main_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 8, + shard_count = 4, deps = [ - "//pkg/config", "//pkg/parser/model", "//pkg/planner/cardinality", - "//pkg/sessionctx", - "//pkg/sessionctx/stmtctx", "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/types", - "//pkg/util/mathutil", "//pkg/util/mock", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 4c01d10e10f48..7eb89c99b431f 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -18,11 +18,9 @@ import ( "math" "time" - "github.com/pingcap/tidb/pkg/config" ddlUtil "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze" "github.com/pingcap/tidb/pkg/statistics/handle/cache" @@ -30,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/history" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" "github.com/pingcap/tidb/pkg/statistics/handle/storage" + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/logutil" @@ -70,6 +69,9 @@ type Handle struct { // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. util.StatsAnalyze + // StatsSyncLoad is used to load stats syncly. + util.StatsSyncLoad + // StatsReadWriter is used to read/write stats from/to storage. util.StatsReadWriter @@ -94,9 +96,6 @@ type Handle struct { // StatsCache ... util.StatsCache - // StatsLoad is used to load stats concurrently - StatsLoad StatsLoad - lease atomic2.Duration } @@ -111,7 +110,6 @@ func (h *Handle) Clear() { // NewHandle creates a Handle for update stats. func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool util.SessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) { - cfg := config.GetGlobalConfig() handle := &Handle{ gpool: gp.New(math.MaxInt16, time.Minute), ddlEventCh: make(chan *ddlUtil.Event, 1000), @@ -135,11 +133,8 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti handle.StatsHistory = history.NewStatsHistory(handle) handle.StatsUsage = usage.NewStatsUsageImpl(handle) handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle) + handle.StatsSyncLoad = syncload.NewStatsSyncLoad(handle) handle.StatsGlobal = globalstats.NewStatsGlobal(handle) - handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) - handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} return handle, nil } diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel new file mode 100644 index 0000000000000..910011c8f6f00 --- /dev/null +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -0,0 +1,44 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "syncload", + srcs = ["stats_syncload.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload", + visibility = ["//visibility:public"], + deps = [ + "//pkg/config", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics", + "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/util", + "//pkg/types", + "//pkg/util", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "syncload_test", + timeout = "short", + srcs = ["stats_syncload_test.go"], + flaky = True, + race = "on", + shard_count = 4, + deps = [ + "//pkg/config", + "//pkg/parser/model", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/testkit", + "//pkg/util/mathutil", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/statistics/handle/handle_hist.go b/pkg/statistics/handle/syncload/stats_syncload.go similarity index 72% rename from pkg/statistics/handle/handle_hist.go rename to pkg/statistics/handle/syncload/stats_syncload.go index 924fda56698b0..b071cdac5523b 100644 --- a/pkg/statistics/handle/handle_hist.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -1,4 +1,4 @@ -// Copyright 2021 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handle +package syncload import ( "fmt" - "sync" "time" "github.com/pingcap/errors" @@ -36,30 +35,36 @@ import ( "go.uber.org/zap" ) +type statsSyncLoad struct { + statsHandle utilstats.StatsHandle + StatsLoad utilstats.StatsLoad +} + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(statsHandle utilstats.StatsHandle) utilstats.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} + cfg := config.GetGlobalConfig() + s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) + s.StatsLoad.NeededItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} + return s +} + type statsWrapper struct { col *statistics.Column idx *statistics.Index } -// StatsLoad is used to load stats concurrently -type StatsLoad struct { - NeededItemsCh chan *NeededItemTask - TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult - SubCtxs []sessionctx.Context - sync.Mutex -} - -// NeededItemTask represents one needed column/indices with expire time. -type NeededItemTask struct { - ToTimeout time.Time - ResultCh chan stmtctx.StatsLoadResult - TableItemID model.TableItemID +// SetSubCtxs sets the sessionctx which is used to run queries background. +// TODO: use SessionPool instead. +func (s *statsSyncLoad) SetSubCtxs(idx int, sctx sessionctx.Context) { + s.StatsLoad.SubCtxs[idx] = sctx } // SendLoadRequests send neededColumns requests -func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error { - remainedItems := h.removeHistLoadedColumns(neededHistItems) +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { if sc.OptimizeTracer != nil { @@ -76,9 +81,9 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems sc.StatsLoad.Timeout = timeout sc.StatsLoad.NeededItems = remainedItems sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) - tasks := make([]*NeededItemTask, 0) + tasks := make([]*utilstats.NeededItemTask, 0) for _, item := range remainedItems { - task := &NeededItemTask{ + task := &utilstats.NeededItemTask{ TableItemID: item, ToTimeout: time.Now().Local().Add(timeout), ResultCh: sc.StatsLoad.ResultCh, @@ -89,7 +94,7 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems defer timer.Stop() for _, task := range tasks { select { - case h.StatsLoad.NeededItemsCh <- task: + case s.StatsLoad.NeededItemsCh <- task: continue case <-timer.C: return errors.New("sync load stats channel is full and timeout sending task to channel") @@ -100,7 +105,7 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems } // SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout -func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { if len(sc.StatsLoad.NeededItems) <= 0 { return nil } @@ -141,10 +146,10 @@ func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { } // removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. -func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID { +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID { remainedItems := make([]model.TableItemID, 0, len(neededItems)) for _, item := range neededItems { - tbl, ok := h.Get(item.TableID) + tbl, ok := s.statsHandle.Get(item.TableID) if !ok { continue } @@ -161,11 +166,11 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode } // AppendNeededItem appends needed columns/indices to ch, it is only used for test -func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { +func (s *statsSyncLoad) AppendNeededItem(task *utilstats.NeededItemTask, timeout time.Duration) error { timer := time.NewTimer(timeout) defer timer.Stop() select { - case h.StatsLoad.NeededItemsCh <- task: + case s.StatsLoad.NeededItemsCh <- task: case <-timer.C: return errors.New("Channel is full and timeout writing to channel") } @@ -175,22 +180,22 @@ func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) e var errExit = errors.New("Stop loading since domain is closed") // SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { defer func() { exitWg.Done() logutil.BgLogger().Info("SubLoadWorker exited.") }() // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry - var lastTask *NeededItemTask + var lastTask *utilstats.NeededItemTask for { - task, err := h.HandleOneTask(sctx, lastTask, exit) + task, err := s.HandleOneTask(sctx, lastTask, exit) lastTask = task if err != nil { switch err { case errExit: return default: - time.Sleep(h.Lease() / 10) + time.Sleep(s.statsHandle.Lease() / 10) continue } } @@ -198,7 +203,7 @@ func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exit } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. -func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) { +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *utilstats.NeededItemTask, exit chan struct{}) (task *utilstats.NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -207,7 +212,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } }() if lastTask == nil { - task, err = h.drainColTask(exit) + task, err = s.drainColTask(exit) if err != nil { if err != errExit { logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) @@ -217,15 +222,15 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } else { task = lastTask } - return h.handleOneItemTask(sctx, task) + return s.handleOneItemTask(sctx, task) } -func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) { +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *utilstats.NeededItemTask) (*utilstats.NeededItemTask, error) { result := stmtctx.StatsLoadResult{Item: task.TableItemID} item := result.Item - tbl, ok := h.Get(item.TableID) + tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } var err error @@ -233,27 +238,27 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.col = col } // to avoid duplicated handling in concurrent scenario - working := h.setWorking(result.Item, task.ResultCh) + working := s.setWorking(result.Item, task.ResultCh) if !working { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } t := time.Now() needUpdate := false - wrapper, err = h.readStatsForOneItem(sctx, item, wrapper) + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper) if err != nil { result.Error = err return task, err @@ -268,15 +273,15 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - h.writeToResultChan(task.ResultCh, result) + if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx) { + s.writeToResultChan(task.ResultCh, result) } - h.finishWorking(result) + s.finishWorking(result) return nil, nil } // readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously -func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { failpoint.Inject("mockReadStatsForOnePanic", nil) failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { if val.(bool) { @@ -362,33 +367,33 @@ func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItem } // drainColTask will hang until a column task can return, and either task or error will be returned. -func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { +func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*utilstats.NeededItemTask, error) { // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh for { select { case <-exit: return nil, errExit - case task, ok := <-h.StatsLoad.NeededItemsCh: + case task, ok := <-s.StatsLoad.NeededItemsCh: if !ok { return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") } // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) continue } return task, nil - case task, ok := <-h.StatsLoad.TimeoutItemsCh: + case task, ok := <-s.StatsLoad.TimeoutItemsCh: select { case <-exit: return nil, errExit - case task0, ok0 := <-h.StatsLoad.NeededItemsCh: + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: if !ok0 { return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") } // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) return task0, nil default: if !ok { @@ -402,7 +407,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { } // writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. -func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemTask) { +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask) { select { case taskCh <- task: default: @@ -410,7 +415,7 @@ func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemT } // writeToChanWithTimeout writes a task to a channel and blocks until timeout. -func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededItemTask, timeout time.Duration) error { +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask, timeout time.Duration) error { timer := time.NewTimer(timeout) defer timer.Stop() select { @@ -422,7 +427,7 @@ func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededI } // writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (*Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) @@ -435,12 +440,12 @@ func (*Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtc } // updateCachedItem updates the column/index hist to global statsCache. -func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() +func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. - tbl, ok := h.Get(item.TableID) + tbl, ok := s.statsHandle.Get(item.TableID) if !ok { return true } @@ -459,35 +464,35 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co tbl = tbl.Copy() tbl.Indices[item.ID] = idxHist } - h.UpdateStatsCache([]*statistics.Table{tbl}, nil) + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) return true } -func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - chList, ok := h.StatsLoad.WorkingColMap[item] +func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + chList, ok := s.StatsLoad.WorkingColMap[item] if ok { if chList[0] == resultCh { return true // just return for duplicate setWorking } - h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) + s.StatsLoad.WorkingColMap[item] = append(chList, resultCh) return false } chList = []chan stmtctx.StatsLoadResult{} chList = append(chList, resultCh) - h.StatsLoad.WorkingColMap[item] = chList + s.StatsLoad.WorkingColMap[item] = chList return true } -func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { +func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok { list := chList[1:] for _, ch := range list { - h.writeToResultChan(ch, result) + s.writeToResultChan(ch, result) } } - delete(h.StatsLoad.WorkingColMap, result.Item) + delete(s.StatsLoad.WorkingColMap, result.Item) } diff --git a/pkg/statistics/handle/handle_hist_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go similarity index 92% rename from pkg/statistics/handle/handle_hist_test.go rename to pkg/statistics/handle/syncload/stats_syncload_test.go index 28d95020510ce..356dda7e6dd2d 100644 --- a/pkg/statistics/handle/handle_hist_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handle_test +package syncload_test import ( "testing" @@ -40,14 +40,14 @@ func TestSyncLoadSkipUnAnalyzedItems(t *testing.T) { h.SetLease(1) // no item would be loaded - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(0)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(0)`)) tk.MustQuery("trace plan select * from t where a > 10") - failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems") + failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems") tk.MustExec("analyze table t1") // one column would be loaded - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(1)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(1)`)) tk.MustQuery("trace plan select * from t1 where a > 10") - failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems") + failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems") } func TestConcurrentLoadHist(t *testing.T) { @@ -175,11 +175,11 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { inTerms string }{ { - failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOnePanic", + failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOnePanic", inTerms: "panic", }, { - failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOneFail", + failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", inTerms: "return(true)", }, } @@ -206,18 +206,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Error(t, err1) require.NotNil(t, task1) - list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]] - require.True(t, ok) - require.Len(t, list, 1) - require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0]) task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Nil(t, err2) require.Nil(t, task2) - list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]] - require.True(t, ok) - require.Len(t, list, 2) - require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1]) require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) diff --git a/pkg/statistics/handle/util/BUILD.bazel b/pkg/statistics/handle/util/BUILD.bazel index 2b3e27d834cdb..292161a37817c 100644 --- a/pkg/statistics/handle/util/BUILD.bazel +++ b/pkg/statistics/handle/util/BUILD.bazel @@ -16,10 +16,12 @@ go_library( "//pkg/parser/model", "//pkg/parser/terror", "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/table", "//pkg/types", + "//pkg/util", "//pkg/util/chunk", "//pkg/util/intest", "//pkg/util/sqlexec", diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index eb894d22118e4..6d2d1d9db81c0 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -16,14 +16,17 @@ package util import ( "context" + "sync" "time" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tiancaiamao/gp" ) @@ -294,6 +297,44 @@ type StatsReadWriter interface { SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) } +// NeededItemTask represents one needed column/indices with expire time. +type NeededItemTask struct { + ToTimeout time.Time + ResultCh chan stmtctx.StatsLoadResult + TableItemID model.TableItemID +} + +// StatsLoad is used to load stats concurrently +type StatsLoad struct { + NeededItemsCh chan *NeededItemTask + TimeoutItemsCh chan *NeededItemTask + WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult + SubCtxs []sessionctx.Context + sync.Mutex +} + +// StatsSyncLoad implement the sync-load feature. +type StatsSyncLoad interface { + // SendLoadRequests sends load requests to the channel. + SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error + + // SyncWaitStatsLoad will wait for the load requests to finish. + SyncWaitStatsLoad(sc *stmtctx.StatementContext) error + + // AppendNeededItem appends a needed item to the channel. + AppendNeededItem(task *NeededItemTask, timeout time.Duration) error + + // SubLoadWorker will start a goroutine to handle the load requests. + SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) + + // HandleOneTask will handle one task. + HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) + + // SetSubCtxs sets the sessionctx which is used to run queries background. + // TODO: use SessionPool instead. + SetSubCtxs(idx int, sctx sessionctx.Context) +} + // StatsGlobal is used to manage partition table global stats. type StatsGlobal interface { // MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID.