diff --git a/pkg/executor/batch_point_get.go b/pkg/executor/batch_point_get.go index 9de61a9b4f6fc..4ca7f282f4d16 100644 --- a/pkg/executor/batch_point_get.go +++ b/pkg/executor/batch_point_get.go @@ -38,11 +38,13 @@ import ( "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/rowcodec" + "github.com/tikv/client-go/v2/tikvrpc" ) // BatchPointGetExec executes a bunch of point select queries. type BatchPointGetExec struct { exec.BaseExecutor + indexUsageReporter *exec.IndexUsageReporter tblInfo *model.TableInfo idxInfo *model.IndexInfo @@ -164,6 +166,13 @@ func (e *BatchPointGetExec) Close() error { if e.RuntimeStats() != nil && e.snapshot != nil { e.snapshot.SetOption(kv.CollectRuntimeStats, nil) } + if e.indexUsageReporter != nil && e.idxInfo != nil { + kvReqTotal := e.stats.GetCmdRPCCount(tikvrpc.CmdBatchGet) + // We cannot distinguish how many rows are coming from each partition. Here, we record all index usages on the + // table itself (but not for the partition physical table), which is different from how an index usage is + // treated for a local index. + e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal) + } e.inited = 0 e.index = 0 return nil diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index c20c576857185..7c11362f72066 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -3639,28 +3639,30 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea return nil, err } paging := b.ctx.GetSessionVars().EnablePaging + e := &IndexReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: dagReq, - startTS: startTS, - txnScope: b.txnScope, - readReplicaScope: b.readReplicaScope, - isStaleness: b.isStaleness, - netDataSize: v.GetNetDataSize(), - physicalTableID: physicalTableID, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - columns: is.Columns, - byItems: is.ByItems, - paging: paging, - corColInFilter: b.corColInDistPlan(v.IndexPlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - plans: v.IndexPlans, - outputColumns: v.OutputColumns, + BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), + dagPB: dagReq, + startTS: startTS, + txnScope: b.txnScope, + readReplicaScope: b.readReplicaScope, + isStaleness: b.isStaleness, + netDataSize: v.GetNetDataSize(), + physicalTableID: physicalTableID, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + columns: is.Columns, + byItems: is.ByItems, + paging: paging, + corColInFilter: b.corColInDistPlan(v.IndexPlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + plans: v.IndexPlans, + outputColumns: v.OutputColumns, } for _, col := range v.OutputColumns { @@ -3827,28 +3829,29 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } e := &IndexLookUpExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: indexReq, - startTS: startTS, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - byItems: is.ByItems, - desc: is.Desc, - tableRequest: tableReq, - columns: ts.Columns, - indexPaging: indexPaging, - dataReaderBuilder: readerBuilder, - corColInIdxSide: b.corColInDistPlan(v.IndexPlans), - corColInTblSide: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - idxPlans: v.IndexPlans, - tblPlans: v.TablePlans, - PushedLimit: v.PushedLimit, - idxNetDataSize: v.GetAvgTableRowSize(), - avgRowSize: v.GetAvgTableRowSize(), + BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), + dagPB: indexReq, + startTS: startTS, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + byItems: is.ByItems, + desc: is.Desc, + tableRequest: tableReq, + columns: ts.Columns, + indexPaging: indexPaging, + dataReaderBuilder: readerBuilder, + corColInIdxSide: b.corColInDistPlan(v.IndexPlans), + corColInTblSide: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + idxPlans: v.IndexPlans, + tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, + idxNetDataSize: v.GetAvgTableRowSize(), + avgRowSize: v.GetAvgTableRowSize(), } if v.ExtraHandleCol != nil { @@ -3988,8 +3991,10 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } paging := b.ctx.GetSessionVars().EnablePaging + e := &IndexMergeReaderExecutor{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), dagPBs: partialReqs, startTS: startTS, table: tblInfo, @@ -4017,6 +4022,27 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd return e, nil } +type tableStatsPreloader interface { + LoadTableStats(sessionctx.Context) +} + +func (b *executorBuilder) buildIndexUsageReporter(plan tableStatsPreloader) (indexUsageReporter *exec.IndexUsageReporter) { + sc := b.ctx.GetSessionVars().StmtCtx + if b.ctx.GetSessionVars().StmtCtx.IndexUsageCollector != nil && + sc.RuntimeStatsColl != nil { + // Preload the table stats. If the statement is a point-get or execute, the planner may not have loaded the + // stats. + plan.LoadTableStats(b.ctx) + + statsMap := sc.GetUsedStatsInfo(false) + indexUsageReporter = exec.NewIndexUsageReporter( + sc.IndexUsageCollector, + sc.RuntimeStatsColl, statsMap) + } + + return indexUsageReporter +} + func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) exec.Executor { if b.Ti != nil { b.Ti.UseIndexMerge = true @@ -5013,20 +5039,21 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo) e := &BatchPointGetExec{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, plan.Schema(), plan.ID()), - tblInfo: plan.TblInfo, - idxInfo: plan.IndexInfo, - rowDecoder: decoder, - keepOrder: plan.KeepOrder, - desc: plan.Desc, - lock: plan.Lock, - waitTime: plan.LockWaitTime, - partExpr: plan.PartitionExpr, - partPos: plan.PartitionColPos, - planPhysIDs: plan.PartitionIDs, - singlePart: plan.SinglePart, - partTblID: plan.PartTblID, - columns: plan.Columns, + BaseExecutor: exec.NewBaseExecutor(b.ctx, plan.Schema(), plan.ID()), + indexUsageReporter: b.buildIndexUsageReporter(plan), + tblInfo: plan.TblInfo, + idxInfo: plan.IndexInfo, + rowDecoder: decoder, + keepOrder: plan.KeepOrder, + desc: plan.Desc, + lock: plan.Lock, + waitTime: plan.LockWaitTime, + partExpr: plan.PartitionExpr, + partPos: plan.PartitionColPos, + planPhysIDs: plan.PartitionIDs, + singlePart: plan.SinglePart, + partTblID: plan.PartTblID, + columns: plan.Columns, } e.snapshot, err = b.getSnapshot() diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index d20bdbcb4d0cf..129452a79e88b 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -165,6 +165,7 @@ func rebuildIndexRanges(ctx sessionctx.Context, is *plannercore.PhysicalIndexSca // IndexReaderExecutor sends dag request and reads index data from kv layer. type IndexReaderExecutor struct { exec.BaseExecutor + indexUsageReporter *exec.IndexUsageReporter // For a partitioned table, the IndexReaderExecutor works on a partition, so // the type of this table field is actually `table.PhysicalTable`. @@ -224,6 +225,10 @@ func (e *IndexReaderExecutor) setDummy() { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() (err error) { + if e.indexUsageReporter != nil { + e.indexUsageReporter.ReportCopIndexUsage(e.physicalTableID, e.index.ID, e.plans[0].ID()) + } + if e.result != nil { err = e.result.Close() } @@ -413,6 +418,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) // IndexLookUpExecutor implements double read for index scan. type IndexLookUpExecutor struct { exec.BaseExecutor + indexUsageReporter *exec.IndexUsageReporter table table.Table index *model.IndexInfo @@ -824,6 +830,12 @@ func (e *IndexLookUpExecutor) Close() error { if e.stats != nil { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } + if e.indexUsageReporter != nil { + e.indexUsageReporter.ReportCopIndexUsage( + e.table.Meta().ID, + e.index.ID, + e.idxPlans[0].ID()) + } e.kvRanges = e.kvRanges[:0] if e.dummy { return nil diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 0c46ae014296a..1d181e269fa08 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -2235,6 +2235,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { reuseObj = nil } sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(reuseObj) + + // also enable index usage collector + sc.IndexUsageCollector = ctx.NewStmtIndexUsageCollector() } sc.ForcePlanCache = fixcontrol.GetBoolWithDefault(vars.OptimizerFixControl, fixcontrol.Fix49736, false) diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index abbf814ea3c46..8e168823b9604 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -86,6 +86,7 @@ const ( // 3. if accessed, just ignore it. type IndexMergeReaderExecutor struct { exec.BaseExecutor + indexUsageReporter *exec.IndexUsageReporter table table.Table indexes []*model.IndexInfo @@ -910,6 +911,17 @@ func (e *IndexMergeReaderExecutor) Close() error { if e.stats != nil { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } + if e.indexUsageReporter != nil { + tableID := e.table.Meta().ID + for _, p := range e.partialPlans { + is, ok := p[0].(*plannercore.PhysicalIndexScan) + if !ok { + continue + } + + e.indexUsageReporter.ReportCopIndexUsage(tableID, is.Index.ID, is.ID()) + } + } if e.finished == nil { return nil } diff --git a/pkg/executor/internal/exec/BUILD.bazel b/pkg/executor/internal/exec/BUILD.bazel index d78905e102458..9001c4892ec1f 100644 --- a/pkg/executor/internal/exec/BUILD.bazel +++ b/pkg/executor/internal/exec/BUILD.bazel @@ -1,15 +1,21 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "exec", - srcs = ["executor.go"], + srcs = [ + "executor.go", + "indexusage.go", + ], importpath = "github.com/pingcap/tidb/pkg/executor/internal/exec", visibility = ["//pkg/executor:__subpackages__"], deps = [ "//pkg/domain", "//pkg/expression", "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", + "//pkg/statistics", + "//pkg/statistics/handle/usage/indexusage", "//pkg/types", "//pkg/util", "//pkg/util/chunk", @@ -21,3 +27,25 @@ go_library( "@com_github_ngaut_pools//:pools", ], ) + +go_test( + name = "exec_test", + timeout = "short", + srcs = ["indexusage_test.go"], + flaky = True, + shard_count = 5, + deps = [ + ":exec", + "//pkg/domain", + "//pkg/parser/model", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics", + "//pkg/statistics/handle/usage/indexusage", + "//pkg/testkit", + "//pkg/types/parser_driver", + "//pkg/util/logutil", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_stretchr_testify//require", + "@org_uber_go_zap//:zap", + ], +) diff --git a/pkg/executor/internal/exec/indexusage.go b/pkg/executor/internal/exec/indexusage.go new file mode 100644 index 0000000000000..39b11e7b012e0 --- /dev/null +++ b/pkg/executor/internal/exec/indexusage.go @@ -0,0 +1,91 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" + "github.com/pingcap/tidb/pkg/util/execdetails" +) + +// IndexUsageReporter is a toolkit to report index usage +type IndexUsageReporter struct { + reporter *indexusage.StmtIndexUsageCollector + runtimeStatsColl *execdetails.RuntimeStatsColl + statsMap map[int64]*stmtctx.UsedStatsInfoForTable +} + +// NewIndexUsageReporter creates an index usage reporter util +func NewIndexUsageReporter(reporter *indexusage.StmtIndexUsageCollector, + runtimeStatsColl *execdetails.RuntimeStatsColl, + statsMap map[int64]*stmtctx.UsedStatsInfoForTable) *IndexUsageReporter { + return &IndexUsageReporter{ + reporter: reporter, + runtimeStatsColl: runtimeStatsColl, + statsMap: statsMap, + } +} + +// ReportCopIndexUsage reports the index usage to the inside collector +func (e *IndexUsageReporter) ReportCopIndexUsage(tableID int64, indexID int64, planID int) { + tableRowCount, ok := e.getTableRowCount(tableID) + if !ok { + // skip if the table is empty or the stats is not valid + return + } + + copStats := e.runtimeStatsColl.GetCopStats(planID) + if copStats == nil { + return + } + copStats.Lock() + defer copStats.Unlock() + kvReq := copStats.GetTasks() + accessRows := copStats.GetActRows() + + sample := indexusage.NewSample(0, uint64(kvReq), uint64(accessRows), uint64(tableRowCount)) + e.reporter.Update(tableID, indexID, sample) +} + +// ReportPointGetIndexUsage reports the index usage of a point get or batch point get +func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, indexID int64, planID int, kvRequestTotal int64) { + tableRowCount, ok := e.getTableRowCount(tableID) + if !ok { + // skip if the table is empty or the stats is not valid + return + } + + basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID) + if basic == nil { + return + } + accessRows := basic.GetActRows() + + sample := indexusage.NewSample(0, uint64(kvRequestTotal), uint64(accessRows), uint64(tableRowCount)) + e.reporter.Update(tableID, indexID, sample) +} + +// getTableRowCount returns the `RealtimeCount` of a table +func (e *IndexUsageReporter) getTableRowCount(tableID int64) (int64, bool) { + stats, ok := e.statsMap[tableID] + if !ok { + return 0, false + } + if stats.Version == statistics.PseudoVersion { + return 0, false + } + return stats.RealtimeCount, true +} diff --git a/pkg/executor/internal/exec/indexusage_test.go b/pkg/executor/internal/exec/indexusage_test.go new file mode 100644 index 0000000000000..e38cc06527a66 --- /dev/null +++ b/pkg/executor/internal/exec/indexusage_test.go @@ -0,0 +1,452 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec_test + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" + "github.com/pingcap/tidb/pkg/testkit" + driver "github.com/pingcap/tidb/pkg/types/parser_driver" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestIndexUsageReporter(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tableID := int64(1) + indexID := int64(2) + + sc := tk.Session().GetSessionVars().StmtCtx + statsMap := sc.GetUsedStatsInfo(true) + statsMap[tableID] = &stmtctx.UsedStatsInfoForTable{ + Version: 123, + RealtimeCount: 100, + } + reporter := exec.NewIndexUsageReporter(sc.IndexUsageCollector, sc.RuntimeStatsColl, statsMap) + runtimeStatsColl := sc.RuntimeStatsColl + + // For PointGet and BatchPointGet + planID := 3 + runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024) + reporter.ReportPointGetIndexUsage(tableID, indexID, planID, 1) + + require.Eventually(t, func() bool { + tk.Session().ReportUsageStats() + usage := dom.StatsHandle().GetIndexUsage(tableID, indexID) + return usage.QueryTotal == 1 && usage.RowAccessTotal == 2024 && usage.KvReqTotal == 1 + }, time.Second*5, time.Millisecond) + + // For Index Scan + planID = 4 + rows := uint64(2024) + zero := uint64(0) + executorID := "test-executor" + runtimeStatsColl.GetOrCreateCopStats(planID, "test-store").RecordOneCopTask("1", &tipb.ExecutorExecutionSummary{ + TimeProcessedNs: &zero, + NumProducedRows: &rows, + NumIterations: &zero, + ExecutorId: &executorID, + Concurrency: &zero, + }) + reporter.ReportCopIndexUsage(tableID, indexID, planID) + + require.Eventually(t, func() bool { + tk.Session().ReportUsageStats() + usage := dom.StatsHandle().GetIndexUsage(tableID, indexID) + return usage.QueryTotal == 1 && usage.RowAccessTotal == 2024+2024 && usage.KvReqTotal == 2 + }, time.Second*5, time.Millisecond) + + // If the version is pseudo, skip it + statsMap[tableID] = &stmtctx.UsedStatsInfoForTable{ + Version: statistics.PseudoVersion, + RealtimeCount: 100, + } + planID = 4 + runtimeStatsColl.GetBasicRuntimeStats(planID).Record(time.Second, 2024) + reporter.ReportPointGetIndexUsage(tableID, indexID, planID, 1) + + require.Eventually(t, func() bool { + tk.Session().ReportUsageStats() + usage := dom.StatsHandle().GetIndexUsage(tableID, indexID) + return usage.QueryTotal == 1 && usage.RowAccessTotal == 2024+2024 && usage.KvReqTotal == 2 + }, time.Second*5, time.Millisecond) +} + +type indexStatsExpect struct { + tableID int64 + idxID int64 + samples []indexusage.Sample +} + +type testCase struct { + sql string + havePlan string + expects []indexStatsExpect +} + +func mergeTwoSample(s1 *indexusage.Sample, s2 indexusage.Sample) { + s1.QueryTotal += s2.QueryTotal + s1.RowAccessTotal += s2.RowAccessTotal + s1.KvReqTotal += s2.KvReqTotal + for i, val := range s2.PercentageAccess { + s1.PercentageAccess[i] += val + } + + if s1.LastUsedAt.Before(s2.LastUsedAt) { + s1.LastUsedAt = s2.LastUsedAt + } +} + +func runIndexUsageTestCases(t *testing.T, dom *domain.Domain, tk *testkit.TestKit, cases []testCase) { + existUsageMap := make(map[indexusage.GlobalIndexID]*indexusage.Sample) + for _, c := range cases { + if len(c.havePlan) > 0 { + tk.MustHavePlan(c.sql, c.havePlan) + } + // a special case to support preparing statement + if strings.HasPrefix(c.sql, "prepare") || strings.HasPrefix(c.sql, "PREPARE") { + tk.MustExec(c.sql) + continue + } + tk.MustQuery(c.sql) + + for _, idx := range c.expects { + globalIdxID := indexusage.GlobalIndexID{TableID: idx.tableID, IndexID: idx.idxID} + + existUsage := existUsageMap[globalIdxID] + if existUsage == nil { + existUsage = &indexusage.Sample{} + existUsageMap[globalIdxID] = existUsage + } + for _, s := range idx.samples { + mergeTwoSample(existUsage, s) + } + + existUsageMap[globalIdxID] = existUsage + } + + require.Eventuallyf(t, func() bool { + tk.Session().ReportUsageStats() + + result := true + for _, idx := range c.expects { + usage := dom.StatsHandle().GetIndexUsage(idx.tableID, idx.idxID) + require.NotNil(t, usage) + + globalIdxID := indexusage.GlobalIndexID{TableID: idx.tableID, IndexID: idx.idxID} + existUsage := existUsageMap[globalIdxID] + + condition := existUsage.QueryTotal == usage.QueryTotal && + existUsage.KvReqTotal <= usage.KvReqTotal && + existUsage.RowAccessTotal == usage.RowAccessTotal && + existUsage.PercentageAccess == usage.PercentageAccess + result = result && condition + logutil.BgLogger().Info("assert index usage", + zap.Uint64("QueryTotal", usage.QueryTotal), + zap.Uint64("RowAccessTotal", usage.RowAccessTotal), + zap.Uint64("KvReqTotal", usage.KvReqTotal), + zap.Uint64s("PercentageAccess", usage.PercentageAccess[:]), + zap.Uint64("Expect QueryTotal", existUsage.QueryTotal), + zap.Uint64("Expect RowAccessTotal", existUsage.RowAccessTotal), + zap.Uint64("Expect KvReqTotal", existUsage.KvReqTotal), + zap.Uint64s("Expect PercentageAccess", existUsage.PercentageAccess[:]), + zap.Bool("result", condition), + zap.String("sql", c.sql)) + } + return result + }, time.Second*5, time.Millisecond*100, "test for sql %s failed", c.sql) + } +} + +func wrapTestCaseWithPrepare(cases []testCase) []testCase { + result := make([]testCase, 0, len(cases)*3) + for _, c := range cases { + if !strings.HasPrefix(c.sql, "select") && + !strings.HasPrefix(c.sql, "SELECT") { + result = append(result, c) + continue + } + // For a statement beginning with select, try to prepare/execute it. + result = append(result, []testCase{ + { + fmt.Sprintf("prepare test_stmt from %s", driver.WrapInSingleQuotes(c.sql)), + "", + []indexStatsExpect{}, + }, + // First "Execute" statement is to initialize the plan cache. + { + "execute test_stmt", + "", + c.expects, + }, + // The second "Execute" statement will test whether index collector will successfully get a total rows for + // the table + { + "execute test_stmt", + "", + c.expects, + }, + }...) + } + return result +} + +func TestIndexUsageReporterWithRealData(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (id_1 int, id_2 int, unique key idx_1(id_1), unique key idx_2(id_2))") + + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableID := table.Meta().ID + idx1ID := int64(0) + idx2ID := int64(0) + for _, idx := range table.Indices() { + if idx.Meta().Name.L == "idx_1" { + idx1ID = idx.Meta().ID + } + if idx.Meta().Name.L == "idx_2" { + idx2ID = idx.Meta().ID + } + } + + for i := 0; i < 100; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("analyze table t") + tk.RefreshSession() + tk.MustExec("use test") + + cases := []testCase{ + { + "select id_1 from t where id_1 >= 30", + "IndexReader", + []indexStatsExpect{{tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 70, 100)}}}, + }, + { + "select id_1 from t where id_1 - 95 >= 0 and id_1 >= 90", + "IndexReader", + []indexStatsExpect{{tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 10, 100)}}}, + }, + { + "select id_2 from t use index(idx_1) where id_1 >= 30 and id_1 - 50 >= 0", + "IndexLookUp", + []indexStatsExpect{{tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 70, 100)}}}, + }, + { + "select /*+ USE_INDEX_MERGE(t, idx_1, idx_2) */ id_2 from t where id_1 >= 30 and id_2 >= 80 and id_2 - 95 >= 0", + "IndexMerge", + []indexStatsExpect{ + {tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 70, 100)}}, + {tableID, idx2ID, []indexusage.Sample{indexusage.NewSample(1, 1, 20, 100)}}, + }, + }, + { + "select * from t where id_1 = 1", + "Point_Get", + []indexStatsExpect{ + {tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 1, 100)}}, + }, + }, + { + "select id_1 from t where id_1 = 1 or id_1 = 50 or id_1 = 25", + "Batch_Point_Get", + []indexStatsExpect{ + {tableID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 3, 100)}}, + }, + }, + } + + runIndexUsageTestCases(t, dom, tk, append(cases, wrapTestCaseWithPrepare(cases)...)) +} + +func TestIndexUsageReporterWithPartitionTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t (id_1 int, unique key idx_1(id_1)) +partition by range (id_1) ( +partition p0 values less than (10), +partition p1 values less than (20), +partition p2 values less than (50), +partition p3 values less than MAXVALUE)`) + + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + var pids []int64 + for i := 0; i < 4; i++ { + pids = append(pids, table.Meta().Partition.GetPartitionIDByName(fmt.Sprintf("p%d", i))) + } + idx1ID := int64(0) + for _, idx := range table.Indices() { + if idx.Meta().Name.L == "idx_1" { + idx1ID = idx.Meta().ID + } + } + + for i := 0; i < 100; i++ { + tk.MustExec("insert into t values (?)", i) + } + tk.MustExec("analyze table t") + tk.RefreshSession() + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + + cases := []testCase{ + // IndexReader on two partitions + { + "select id_1 from t where id_1 >= 30", + "PartitionUnion", + []indexStatsExpect{ + {pids[2], idx1ID, []indexusage.Sample{ + indexusage.NewSample(1, 1, 20, 30), + }}, + {pids[3], idx1ID, []indexusage.Sample{ + indexusage.NewSample(1, 1, 50, 50), + }}, + }, + }, + // IndexReader with selection on a single partition + { + "select id_1 from t where id_1 - 95 >= 0 and id_1 >= 90", + "IndexReader", + []indexStatsExpect{ + {pids[3], idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 10, 50)}}, + }, + }, + // PointGet in a partition + { + "select * from t where id_1 = 1", + "Point_Get", + []indexStatsExpect{ + {pids[0], idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 1, 10)}}, + }, + }, + // BatchPointGet in a partition + { + "select * from t where id_1 in (1,3,5,9)", + "Batch_Point_Get", + []indexStatsExpect{ + {table.Meta().ID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 4, 100)}}, + }, + }, + } + + runIndexUsageTestCases(t, dom, tk, append(cases, wrapTestCaseWithPrepare(cases)...)) +} + +func TestIndexUsageReporterWithGlobalIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_index='on'") + tk.MustExec(`create table t (pk int primary key, id_1 int, unique key idx_1(id_1)) +partition by range (pk) ( +partition p0 values less than (10), +partition p1 values less than (20), +partition p2 values less than (50), +partition p3 values less than MAXVALUE)`) + + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + idx1ID := int64(0) + for _, idx := range table.Indices() { + if idx.Meta().Name.L == "idx_1" { + idx1ID = idx.Meta().ID + } + } + + for i := 0; i < 100; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("analyze table t") + tk.RefreshSession() + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_index='on'") + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + + cases := []testCase{ + // PointGet on global index + { + "select * from t use index(idx_1) where id_1 = 1", + "Point_Get", + []indexStatsExpect{ + {table.Meta().ID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 1, 100)}}, + }, + }, + // BatchPointGet on global index + { + "select * from t ignore index(primary) where id_1 in (1,3,5,9)", + "Batch_Point_Get", + []indexStatsExpect{ + {table.Meta().ID, idx1ID, []indexusage.Sample{indexusage.NewSample(1, 1, 4, 100)}}, + }, + }, + } + + runIndexUsageTestCases(t, dom, tk, append(cases, wrapTestCaseWithPrepare(cases)...)) +} + +func TestDisableIndexUsageReporter(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (id_1 int, id_2 int, unique key idx_1(id_1), unique key idx_2(id_2))") + + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableID := table.Meta().ID + idx1ID := int64(0) + for _, idx := range table.Indices() { + if idx.Meta().Name.L == "idx_1" { + idx1ID = idx.Meta().ID + } + } + + for i := 0; i < 100; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("analyze table t") + tk.RefreshSession() + tk.MustExec("use test") + tk.MustQuery("select id_1 from t where id_1 >= 30") + tk.RefreshSession() + require.Eventually(t, func() bool { + return dom.StatsHandle().GetIndexUsage(tableID, idx1ID).QueryTotal == 1 + }, time.Second*5, time.Millisecond*100) + tk.MustExec("use test") + // turn off the index usage collection + tk.MustExec("set tidb_enable_collect_execution_info='OFF'") + tk.MustQuery("select id_1 from t where id_1 >= 30") + tk.RefreshSession() + for i := 0; i < 10; i++ { + require.Equal(t, uint64(1), dom.StatsHandle().GetIndexUsage(tableID, idx1ID).QueryTotal) + time.Sleep(time.Millisecond * 100) + } +} diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index 75cc599d73cec..bb30ec4fee760 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/rowcodec" + "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -57,10 +58,11 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut } e := &PointGetExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, p.Schema(), p.ID()), - txnScope: b.txnScope, - readReplicaScope: b.readReplicaScope, - isStaleness: b.isStaleness, + BaseExecutor: exec.NewBaseExecutor(b.ctx, p.Schema(), p.ID()), + indexUsageReporter: b.buildIndexUsageReporter(p), + txnScope: b.txnScope, + readReplicaScope: b.readReplicaScope, + isStaleness: b.isStaleness, } e.SetInitCap(1) @@ -116,6 +118,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut // PointGetExecutor executes point select query. type PointGetExecutor struct { exec.BaseExecutor + indexUsageReporter *exec.IndexUsageReporter tblInfo *model.TableInfo handle kv.Handle @@ -200,6 +203,14 @@ func (e *PointGetExecutor) Close() error { if e.RuntimeStats() != nil && e.snapshot != nil { e.snapshot.SetOption(kv.CollectRuntimeStats, nil) } + if e.indexUsageReporter != nil && e.idxInfo != nil { + tableID := e.tblInfo.ID + if e.partitionDef != nil { + tableID = e.partitionDef.ID + } + kvReqTotal := e.stats.SnapshotRuntimeStats.GetCmdRPCCount(tikvrpc.CmdGet) + e.indexUsageReporter.ReportPointGetIndexUsage(tableID, e.idxInfo.ID, e.ID(), kvReqTotal) + } e.done = false return nil } diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index 6bf372df63dbe..50574fe567218 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -401,6 +401,12 @@ func (p *PhysicalIndexReader) MemoryUsage() (sum int64) { return } +// LoadTableStats preloads the stats data for the physical table +func (p *PhysicalIndexReader) LoadTableStats(ctx sessionctx.Context) { + is := p.IndexPlans[0].(*PhysicalIndexScan) + loadTableStats(ctx, is.Table, is.physicalTableID) +} + // PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader. type PushedDownLimit struct { Offset uint64 @@ -551,6 +557,12 @@ func (p *PhysicalIndexLookUpReader) MemoryUsage() (sum int64) { return } +// LoadTableStats preloads the stats data for the physical table +func (p *PhysicalIndexLookUpReader) LoadTableStats(ctx sessionctx.Context) { + ts := p.TablePlans[0].(*PhysicalTableScan) + loadTableStats(ctx, ts.Table, ts.physicalTableID) +} + // PhysicalIndexMergeReader is the reader using multiple indexes in tidb. type PhysicalIndexMergeReader struct { physicalSchemaProducer @@ -651,6 +663,12 @@ func (p *PhysicalIndexMergeReader) MemoryUsage() (sum int64) { return } +// LoadTableStats preloads the stats data for the physical table +func (p *PhysicalIndexMergeReader) LoadTableStats(ctx sessionctx.Context) { + ts := p.TablePlans[0].(*PhysicalTableScan) + loadTableStats(ctx, ts.Table, ts.physicalTableID) +} + // PhysicalIndexScan represents an index scan plan. type PhysicalIndexScan struct { physicalSchemaProducer diff --git a/pkg/planner/core/point_get_plan.go b/pkg/planner/core/point_get_plan.go index fa315ece4b392..fc44906f3ee68 100644 --- a/pkg/planner/core/point_get_plan.go +++ b/pkg/planner/core/point_get_plan.go @@ -308,6 +308,15 @@ func (p *PointGetPlan) MemoryUsage() (sum int64) { return } +// LoadTableStats preloads the stats data for the physical table +func (p *PointGetPlan) LoadTableStats(ctx sessionctx.Context) { + tableID := p.TblInfo.ID + if p.PartitionDef != nil { + tableID = p.PartitionDef.ID + } + loadTableStats(ctx, p.TblInfo, tableID) +} + // BatchPointGetPlan represents a physical plan which contains a bunch of // keys reference the same table and use the same `unique key` type BatchPointGetPlan struct { @@ -528,6 +537,14 @@ func (p *BatchPointGetPlan) MemoryUsage() (sum int64) { return } +// LoadTableStats preloads the stats data for the physical table +func (p *BatchPointGetPlan) LoadTableStats(ctx sessionctx.Context) { + // as a `BatchPointGet` can access multiple partitions, and we cannot distinguish how many rows come from each + // partitions in the existing statistics information, we treat all index usage through a `BatchPointGet` just + // like a normal global index. + loadTableStats(ctx, p.TblInfo, p.TblInfo.ID) +} + // PointPlanKey is used to get point plan that is pre-built for multi-statement query. const PointPlanKey = stringutil.StringerStr("pointPlanKey") diff --git a/pkg/planner/core/stats.go b/pkg/planner/core/stats.go index 67c1f71b245d8..68165c1655d22 100644 --- a/pkg/planner/core/stats.go +++ b/pkg/planner/core/stats.go @@ -1067,3 +1067,25 @@ func (p *LogicalSequence) DeriveStats(childStats []*property.StatsInfo, _ *expre p.SetStats(childStats[len(childStats)-1]) return p.StatsInfo(), nil } + +// loadTableStats loads the stats of the table and store it in the statement `UsedStatsInfo` if it didn't exist +func loadTableStats(ctx sessionctx.Context, tblInfo *model.TableInfo, pid int64) { + statsRecord := ctx.GetSessionVars().StmtCtx.GetUsedStatsInfo(true) + if _, ok := statsRecord[pid]; ok { + return + } + + tableStats := getStatsTable(ctx, tblInfo, pid) + name, _ := getTblInfoForUsedStatsByPhysicalID(ctx, pid) + + statsRecord[pid] = &stmtctx.UsedStatsInfoForTable{ + Name: name, + TblInfo: tblInfo, + RealtimeCount: tableStats.HistColl.RealtimeCount, + ModifyCount: tableStats.HistColl.ModifyCount, + Version: tableStats.Version, + } + if tableStats.Pseudo { + statsRecord[pid].Version = statistics.PseudoVersion + } +} diff --git a/pkg/session/session.go b/pkg/session/session.go index 74434e02fffa1..b657c1361e425 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3051,7 +3051,9 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (types.Session, error) { // which periodically updates stats using the collected data. if do.StatsHandle() != nil && do.StatsUpdating() { s.statsCollector = do.StatsHandle().NewSessionStatsItem().(*usage.SessionStatsItem) - s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector() + if config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() { + s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector() + } } return s, nil @@ -3602,7 +3604,7 @@ func attachStatsCollector(s *session, dom *domain.Domain) *session { if s.statsCollector == nil { s.statsCollector = dom.StatsHandle().NewSessionStatsItem().(*usage.SessionStatsItem) } - if s.idxUsageCollector == nil { + if s.idxUsageCollector == nil && config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() { s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector() } } @@ -4364,6 +4366,16 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo zap.String("sql", stmtNode.Text())) } +// NewStmtIndexUsageCollector creates a new `*indexusage.StmtIndexUsageCollector` based on the internal session index +// usage collector +func (s *session) NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollector { + if s.idxUsageCollector == nil { + return nil + } + + return indexusage.NewStmtIndexUsageCollector(s.idxUsageCollector) +} + // RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver. func RemoveLockDDLJobs(s types.Session, job2ver map[int64]int64, job2ids map[int64]string, printLog bool) { sv := s.GetSessionVars() diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel index b18e2816cd015..0def5d860d80f 100644 --- a/pkg/sessionctx/BUILD.bazel +++ b/pkg/sessionctx/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/parser/model", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/usage/indexusage", "//pkg/util", "//pkg/util/kvcache", "//pkg/util/plancache", diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index bf647cd8d2b1c..da06fcfc1e0fd 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/kvcache" utilpc "github.com/pingcap/tidb/pkg/util/plancache" @@ -183,6 +184,8 @@ type Context interface { DisableSandBoxMode() // ReportUsageStats reports the usage stats to the global collector ReportUsageStats() + // NewStmtIndexUsageCollector creates a new index usage collector for statement + NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollector } // TxnFuture is an interface where implementations have a kv.Transaction field and after diff --git a/pkg/sessionctx/stmtctx/BUILD.bazel b/pkg/sessionctx/stmtctx/BUILD.bazel index b5e153f093cb2..8e6eee1d19d99 100644 --- a/pkg/sessionctx/stmtctx/BUILD.bazel +++ b/pkg/sessionctx/stmtctx/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", + "//pkg/statistics/handle/usage/indexusage", "//pkg/types", "//pkg/util/context", "//pkg/util/dbterror/plannererrors", diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index 095cd585b179f..32c11e3b905bd 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/types" contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" @@ -257,15 +258,16 @@ type StatementContext struct { DiskTracker *disk.Tracker // per statement resource group name // hint /* +ResourceGroup(name) */ can change the statement group name - ResourceGroupName string - RunawayChecker *resourcegroup.RunawayChecker - IsTiFlash atomic2.Bool - RuntimeStatsColl *execdetails.RuntimeStatsColl - TableIDs []int64 - IndexNames []string - StmtType string - OriginalSQL string - digestMemo struct { + ResourceGroupName string + RunawayChecker *resourcegroup.RunawayChecker + IsTiFlash atomic2.Bool + RuntimeStatsColl *execdetails.RuntimeStatsColl + IndexUsageCollector *indexusage.StmtIndexUsageCollector + TableIDs []int64 + IndexNames []string + StmtType string + OriginalSQL string + digestMemo struct { sync.Once normalized string digest *parser.Digest diff --git a/pkg/statistics/handle/usage/indexusage/BUILD.bazel b/pkg/statistics/handle/usage/indexusage/BUILD.bazel index c34c4babe1dd3..d1f9216cba450 100644 --- a/pkg/statistics/handle/usage/indexusage/BUILD.bazel +++ b/pkg/statistics/handle/usage/indexusage/BUILD.bazel @@ -17,6 +17,6 @@ go_test( srcs = ["collector_test.go"], embed = [":indexusage"], flaky = True, - shard_count = 3, + shard_count = 4, deps = ["@com_github_stretchr_testify//require"], ) diff --git a/pkg/statistics/handle/usage/indexusage/collector.go b/pkg/statistics/handle/usage/indexusage/collector.go index 82e390d780031..f1edfb800125f 100644 --- a/pkg/statistics/handle/usage/indexusage/collector.go +++ b/pkg/statistics/handle/usage/indexusage/collector.go @@ -232,3 +232,38 @@ func (s *SessionIndexUsageCollector) Flush() { s.collector.SendDeltaSync(s.indexUsage) s.indexUsage = make(indexUsage) } + +// StmtIndexUsageCollector removes the duplicates index for recording `QueryTotal` in session collector +type StmtIndexUsageCollector struct { + recordedIndex map[GlobalIndexID]struct{} + sessionCollector *SessionIndexUsageCollector + sync.Mutex +} + +// NewStmtIndexUsageCollector creates a new StmtIndexUsageCollector. +func NewStmtIndexUsageCollector(sessionCollector *SessionIndexUsageCollector) *StmtIndexUsageCollector { + return &StmtIndexUsageCollector{ + recordedIndex: make(map[GlobalIndexID]struct{}), + sessionCollector: sessionCollector, + } +} + +// Update updates the index usage in the internal session collector. The `sample.QueryTotal` will be modified according +// to whether this index has been recorded in this statement usage collector. +func (s *StmtIndexUsageCollector) Update(tableID int64, indexID int64, sample Sample) { + // The session index usage collector and the map inside cannot be updated concurrently. However, for executors with + // multiple workers, it's possible for them to be closed (and update stats) at the same time, so a lock is needed + // here. + s.Lock() + defer s.Unlock() + + idxID := GlobalIndexID{IndexID: indexID, TableID: tableID} + if _, ok := s.recordedIndex[idxID]; !ok { + sample.QueryTotal = 1 + s.recordedIndex[idxID] = struct{}{} + } else { + sample.QueryTotal = 0 + } + + s.sessionCollector.Update(tableID, indexID, sample) +} diff --git a/pkg/statistics/handle/usage/indexusage/collector_test.go b/pkg/statistics/handle/usage/indexusage/collector_test.go index a40cd5a4828c0..8ea7423f36e18 100644 --- a/pkg/statistics/handle/usage/indexusage/collector_test.go +++ b/pkg/statistics/handle/usage/indexusage/collector_test.go @@ -19,6 +19,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -214,3 +215,45 @@ func BenchmarkIndexCollector(b *testing.B) { benchmarkIndexCollector(b, 8) }) } + +func TestStmtIndexUsageCollector(t *testing.T) { + iuc := NewCollector() + iuc.StartWorker() + defer iuc.Close() + sessionCollector := iuc.SpawnSessionCollector() + + statementCollector := NewStmtIndexUsageCollector(sessionCollector) + statementCollector.Update(1, 1, NewSample(10, 0, 0, 0)) + sessionCollector.Flush() + require.Eventuallyf(t, func() bool { + return iuc.GetIndexUsage(1, 1) != Sample{} + }, time.Second, time.Millisecond, "wait for report") + require.Equal(t, iuc.GetIndexUsage(1, 1).QueryTotal, uint64(1)) + + // duplicated index will be ignored + statementCollector.Update(1, 1, NewSample(10, 0, 0, 0)) + sessionCollector.Flush() + require.Eventuallyf(t, func() bool { + iu := iuc.GetIndexUsage(1, 1) + emptySample := Sample{} + if iu != emptySample { + return iu.QueryTotal == 1 + } + return false + }, time.Second, time.Millisecond, "wait for report") + + statementCollector.Update(1, 2, NewSample(10, 0, 0, 0)) + sessionCollector.Flush() + require.Eventuallyf(t, func() bool { + return iuc.GetIndexUsage(1, 2) != Sample{} + }, time.Second, time.Millisecond, "wait for report") + require.Equal(t, iuc.GetIndexUsage(1, 2).QueryTotal, uint64(1)) + + // `queryTotal` will be 1, even if it's set 0 + statementCollector.Update(1, 3, NewSample(0, 0, 0, 0)) + sessionCollector.Flush() + require.Eventuallyf(t, func() bool { + return iuc.GetIndexUsage(1, 3) != Sample{} + }, time.Second, time.Millisecond, "wait for report") + require.Equal(t, iuc.GetIndexUsage(1, 3).QueryTotal, uint64(1)) +} diff --git a/pkg/util/execdetails/execdetails.go b/pkg/util/execdetails/execdetails.go index abc849f3f96e6..2d75b80abbc1a 100644 --- a/pkg/util/execdetails/execdetails.go +++ b/pkg/util/execdetails/execdetails.go @@ -612,6 +612,14 @@ func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { return totalRows } +// GetTasks return total tasks of CopRuntimeStats +func (crs *CopRuntimeStats) GetTasks() (totalTasks int32) { + for _, instanceStats := range crs.stats { + totalTasks += instanceStats.totalTasks + } + return totalTasks +} + // MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information. func (crs *CopRuntimeStats) MergeBasicStats() (procTimes Percentile[Duration], totalTime time.Duration, totalTasks, totalLoops, totalThreads int32, totalTiFlashScanContext TiFlashScanContext) { totalTiFlashScanContext = TiFlashScanContext{} diff --git a/pkg/util/mock/BUILD.bazel b/pkg/util/mock/BUILD.bazel index 75ac693889df1..a2c5d14378b0c 100644 --- a/pkg/util/mock/BUILD.bazel +++ b/pkg/util/mock/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/sessionctx", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/usage/indexusage", "//pkg/util", "//pkg/util/disk", "//pkg/util/memory", diff --git a/pkg/util/mock/context.go b/pkg/util/mock/context.go index 7cbab29ee61cb..969eab72645d0 100644 --- a/pkg/util/mock/context.go +++ b/pkg/util/mock/context.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/memory" @@ -461,6 +462,11 @@ func (*Context) ReportUsageStats() {} // Close implements the sessionctx.Context interface. func (*Context) Close() {} +// NewStmtIndexUsageCollector implements the sessionctx.Context interface +func (*Context) NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollector { + return nil +} + // NewContext creates a new mocked sessionctx.Context. func NewContext() *Context { ctx, cancel := context.WithCancel(context.Background())