Skip to content

Commit

Permalink
executors: record the index usage in each executors (#50643)
Browse files Browse the repository at this point in the history
close #50261
  • Loading branch information
YangKeao authored Jan 30, 2024
1 parent 25f44f5 commit 7087f70
Show file tree
Hide file tree
Showing 23 changed files with 889 additions and 75 deletions.
9 changes: 9 additions & 0 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/tikv/client-go/v2/tikvrpc"
)

// BatchPointGetExec executes a bunch of point select queries.
type BatchPointGetExec struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

tblInfo *model.TableInfo
idxInfo *model.IndexInfo
Expand Down Expand Up @@ -164,6 +166,13 @@ func (e *BatchPointGetExec) Close() error {
if e.RuntimeStats() != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
if e.indexUsageReporter != nil && e.idxInfo != nil {
kvReqTotal := e.stats.GetCmdRPCCount(tikvrpc.CmdBatchGet)
// We cannot distinguish how many rows are coming from each partition. Here, we record all index usages on the
// table itself (but not for the partition physical table), which is different from how an index usage is
// treated for a local index.
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal)
}
e.inited = 0
e.index = 0
return nil
Expand Down
141 changes: 84 additions & 57 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3639,28 +3639,30 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
return nil, err
}
paging := b.ctx.GetSessionVars().EnablePaging

e := &IndexReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
}

for _, col := range v.OutputColumns {
Expand Down Expand Up @@ -3827,28 +3829,29 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

e := &IndexLookUpExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
}

if v.ExtraHandleCol != nil {
Expand Down Expand Up @@ -3988,8 +3991,10 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
}

paging := b.ctx.GetSessionVars().EnablePaging

e := &IndexMergeReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPBs: partialReqs,
startTS: startTS,
table: tblInfo,
Expand Down Expand Up @@ -4017,6 +4022,27 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
return e, nil
}

type tableStatsPreloader interface {
LoadTableStats(sessionctx.Context)
}

func (b *executorBuilder) buildIndexUsageReporter(plan tableStatsPreloader) (indexUsageReporter *exec.IndexUsageReporter) {
sc := b.ctx.GetSessionVars().StmtCtx
if b.ctx.GetSessionVars().StmtCtx.IndexUsageCollector != nil &&
sc.RuntimeStatsColl != nil {
// Preload the table stats. If the statement is a point-get or execute, the planner may not have loaded the
// stats.
plan.LoadTableStats(b.ctx)

statsMap := sc.GetUsedStatsInfo(false)
indexUsageReporter = exec.NewIndexUsageReporter(
sc.IndexUsageCollector,
sc.RuntimeStatsColl, statsMap)
}

return indexUsageReporter
}

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) exec.Executor {
if b.Ti != nil {
b.Ti.UseIndexMerge = true
Expand Down Expand Up @@ -5013,20 +5039,21 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan

decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
keepOrder: plan.KeepOrder,
desc: plan.Desc,
lock: plan.Lock,
waitTime: plan.LockWaitTime,
partExpr: plan.PartitionExpr,
partPos: plan.PartitionColPos,
planPhysIDs: plan.PartitionIDs,
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
BaseExecutor: exec.NewBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
indexUsageReporter: b.buildIndexUsageReporter(plan),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
keepOrder: plan.KeepOrder,
desc: plan.Desc,
lock: plan.Lock,
waitTime: plan.LockWaitTime,
partExpr: plan.PartitionExpr,
partPos: plan.PartitionColPos,
planPhysIDs: plan.PartitionIDs,
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
}

e.snapshot, err = b.getSnapshot()
Expand Down
12 changes: 12 additions & 0 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func rebuildIndexRanges(ctx sessionctx.Context, is *plannercore.PhysicalIndexSca
// IndexReaderExecutor sends dag request and reads index data from kv layer.
type IndexReaderExecutor struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

// For a partitioned table, the IndexReaderExecutor works on a partition, so
// the type of this table field is actually `table.PhysicalTable`.
Expand Down Expand Up @@ -224,6 +225,10 @@ func (e *IndexReaderExecutor) setDummy() {

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.indexUsageReporter != nil {
e.indexUsageReporter.ReportCopIndexUsage(e.physicalTableID, e.index.ID, e.plans[0].ID())
}

if e.result != nil {
err = e.result.Close()
}
Expand Down Expand Up @@ -413,6 +418,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
// IndexLookUpExecutor implements double read for index scan.
type IndexLookUpExecutor struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

table table.Table
index *model.IndexInfo
Expand Down Expand Up @@ -824,6 +830,12 @@ func (e *IndexLookUpExecutor) Close() error {
if e.stats != nil {
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
}
if e.indexUsageReporter != nil {
e.indexUsageReporter.ReportCopIndexUsage(
e.table.Meta().ID,
e.index.ID,
e.idxPlans[0].ID())
}
e.kvRanges = e.kvRanges[:0]
if e.dummy {
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
reuseObj = nil
}
sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(reuseObj)

// also enable index usage collector
sc.IndexUsageCollector = ctx.NewStmtIndexUsageCollector()
}

sc.ForcePlanCache = fixcontrol.GetBoolWithDefault(vars.OptimizerFixControl, fixcontrol.Fix49736, false)
Expand Down
12 changes: 12 additions & 0 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
// 3. if accessed, just ignore it.
type IndexMergeReaderExecutor struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

table table.Table
indexes []*model.IndexInfo
Expand Down Expand Up @@ -910,6 +911,17 @@ func (e *IndexMergeReaderExecutor) Close() error {
if e.stats != nil {
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
}
if e.indexUsageReporter != nil {
tableID := e.table.Meta().ID
for _, p := range e.partialPlans {
is, ok := p[0].(*plannercore.PhysicalIndexScan)
if !ok {
continue
}

e.indexUsageReporter.ReportCopIndexUsage(tableID, is.Index.ID, is.ID())
}
}
if e.finished == nil {
return nil
}
Expand Down
32 changes: 30 additions & 2 deletions pkg/executor/internal/exec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "exec",
srcs = ["executor.go"],
srcs = [
"executor.go",
"indexusage.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/internal/exec",
visibility = ["//pkg/executor:__subpackages__"],
deps = [
"//pkg/domain",
"//pkg/expression",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
Expand All @@ -21,3 +27,25 @@ go_library(
"@com_github_ngaut_pools//:pools",
],
)

go_test(
name = "exec_test",
timeout = "short",
srcs = ["indexusage_test.go"],
flaky = True,
shard_count = 5,
deps = [
":exec",
"//pkg/domain",
"//pkg/parser/model",
"//pkg/sessionctx/stmtctx",
"//pkg/statistics",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/testkit",
"//pkg/types/parser_driver",
"//pkg/util/logutil",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
)
Loading

0 comments on commit 7087f70

Please sign in to comment.