Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execdetails: tiny optimization for executor stats #58186

Merged
merged 12 commits into from
Dec 17, 2024
18 changes: 9 additions & 9 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,27 +473,27 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro
}

// FillDummySummariesForTiFlashTasks fills dummy execution summaries for mpp tasks which lack summaries
func FillDummySummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, callee string, storeTypeName string, allPlanIDs []int, recordedPlanIDs map[int]int) {
func FillDummySummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, callee string, storeType kv.StoreType, allPlanIDs []int, recordedPlanIDs map[int]int) {
num := uint64(0)
dummySummary := &tipb.ExecutorExecutionSummary{TimeProcessedNs: &num, NumProducedRows: &num, NumIterations: &num, ExecutorId: nil}
for _, planID := range allPlanIDs {
if _, ok := recordedPlanIDs[planID]; !ok {
runtimeStatsColl.RecordOneCopTask(planID, storeTypeName, callee, dummySummary)
runtimeStatsColl.RecordOneCopTask(planID, storeType, callee, dummySummary)
}
}
}

// recordExecutionSummariesForTiFlashTasks records mpp task execution summaries
func recordExecutionSummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, executionSummaries []*tipb.ExecutorExecutionSummary, callee string, storeTypeName string, allPlanIDs []int) {
func recordExecutionSummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, executionSummaries []*tipb.ExecutorExecutionSummary, callee string, storeType kv.StoreType, allPlanIDs []int) {
var recordedPlanIDs = make(map[int]int)
for _, detail := range executionSummaries {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
recordedPlanIDs[runtimeStatsColl.
RecordOneCopTask(-1, storeTypeName, callee, detail)] = 0
RecordOneCopTask(-1, storeType, callee, detail)] = 0
}
}
FillDummySummariesForTiFlashTasks(runtimeStatsColl, callee, storeTypeName, allPlanIDs, recordedPlanIDs)
FillDummySummariesForTiFlashTasks(runtimeStatsColl, callee, storeType, allPlanIDs, recordedPlanIDs)
}

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) {
Expand Down Expand Up @@ -524,10 +524,10 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
r.stats.mergeCopRuntimeStats(copStats, respTime)

if copStats.ScanDetail != nil && len(r.copPlanIDs) > 0 {
r.ctx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail)
r.ctx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType, copStats.ScanDetail)
}
if len(r.copPlanIDs) > 0 {
r.ctx.RuntimeStatsColl.RecordTimeDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), &copStats.TimeDetail)
r.ctx.RuntimeStatsColl.RecordTimeDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType, &copStats.TimeDetail)
}

// If hasExecutor is true, it means the summary is returned from TiFlash.
Expand All @@ -551,7 +551,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
r.ctx.CPUUsage.MergeTikvCPUTime(copStats.TimeDetail.ProcessTime)
}
if hasExecutor {
recordExecutionSummariesForTiFlashTasks(r.ctx.RuntimeStatsColl, r.selectResp.GetExecutionSummaries(), callee, r.storeType.Name(), r.copPlanIDs)
recordExecutionSummariesForTiFlashTasks(r.ctx.RuntimeStatsColl, r.selectResp.GetExecutionSummaries(), callee, r.storeType, r.copPlanIDs)
} else {
// For cop task cases, we still need this protection.
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
Expand All @@ -570,7 +570,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
RecordOneCopTask(planID, r.storeType, callee, detail)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func TestUpdateCopRuntimeStats(t *testing.T) {
require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String())
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, kv.TiKV).String())
}
1 change: 1 addition & 0 deletions pkg/executor/internal/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
deps = [
":exec",
"//pkg/domain",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/sessionctx/stmtctx",
"//pkg/statistics",
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/internal/exec/indexusage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestIndexUsageReporter(t *testing.T) {
rows := uint64(2024)
zero := uint64(0)
executorID := "test-executor"
runtimeStatsColl.GetOrCreateCopStats(planID, "test-store").RecordOneCopTask("1", &tipb.ExecutorExecutionSummary{
runtimeStatsColl.GetOrCreateCopStats(planID, kv.TiKV).RecordOneCopTask("1", &tipb.ExecutorExecutionSummary{
TimeProcessedNs: &zero,
NumProducedRows: &rows,
NumIterations: &zero,
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func (c *localMppCoordinator) handleAllReports() error {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
recordedPlanIDs[c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(-1, kv.TiFlash.Name(), report.mppReq.Meta.GetAddress(), detail)] = 0
RecordOneCopTask(-1, kv.TiFlash, report.mppReq.Meta.GetAddress(), detail)] = 0
}
}
if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
Expand All @@ -649,7 +649,7 @@ func (c *localMppCoordinator) handleAllReports() error {
}
}
}
distsql.FillDummySummariesForTiFlashTasks(c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl, "", kv.TiFlash.Name(), c.planIDs, recordedPlanIDs)
distsql.FillDummySummariesForTiFlashTasks(c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl, "", kv.TiFlash, c.planIDs, recordedPlanIDs)
case <-time.After(receiveReportTimeout):
metrics.MppCoordinatorStatsReportNotReceived.Inc()
logutil.BgLogger().Warn(fmt.Sprintf("Mpp coordinator not received all reports within %d seconds", int(receiveReportTimeout.Seconds())),
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/execdetails/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/util/execdetails",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"@com_github_influxdata_tdigest//:tdigest",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_tipb//go-tipb",
Expand All @@ -25,6 +26,7 @@ go_test(
flaky = True,
race = "on",
deps = [
"//pkg/kv",
"//pkg/testkit/testsetup",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//require",
Expand Down
118 changes: 84 additions & 34 deletions pkg/util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/influxdata/tdigest"
"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -553,7 +554,7 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
}

type basicCopRuntimeStats struct {
storeType string
storeType kv.StoreType
BasicRuntimeStats
threads int32
totalTasks int32
Expand Down Expand Up @@ -687,13 +688,24 @@ func (p *Percentile[valueType]) Sum() float64 {

// String implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) String() string {
if e.storeType == "tiflash" {
buf := bytes.NewBuffer(make([]byte, 0, 16))
buf.WriteString("time:")
buf.WriteString(FormatDuration(time.Duration(e.consume.Load())))
buf.WriteString(", loops:")
buf.WriteString(strconv.Itoa(int(e.loop.Load())))
if e.storeType == kv.TiFlash {
buf.WriteString(", threads:")
buf.WriteString(strconv.Itoa(int(e.threads)))
buf.WriteString(", ")
if e.tiflashWaitSummary.CanBeIgnored() {
return fmt.Sprintf("time:%v, loops:%d, threads:%d, %s", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads, e.tiflashScanContext.String())
buf.WriteString(e.tiflashScanContext.String())
} else {
buf.WriteString(e.tiflashWaitSummary.String())
buf.WriteString(", ")
buf.WriteString(e.tiflashScanContext.String())
}
return fmt.Sprintf("time:%v, loops:%d, threads:%d, %s, %s", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads, e.tiflashWaitSummary.String(), e.tiflashScanContext.String())
}
return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load())
return buf.String()
}

// Clone implements the RuntimeStats interface.
Expand Down Expand Up @@ -748,8 +760,7 @@ type CopRuntimeStats struct {
stats map[string]*basicCopRuntimeStats
scanDetail *util.ScanDetail
timeDetail *util.TimeDetail
// do not use kv.StoreType because it will meet cycle import error
storeType string
storeType kv.StoreType
sync.Mutex
}

Expand Down Expand Up @@ -867,30 +878,51 @@ func (crs *CopRuntimeStats) String() string {

procTimes, totalTime, totalTasks, totalLoops, totalThreads, totalTiFlashScanContext, totalTiFlashWaitSummary := crs.MergeBasicStats()
avgTime := time.Duration(totalTime.Nanoseconds() / int64(totalTasks))
isTiFlashCop := crs.storeType == "tiflash"
isTiFlashCop := crs.storeType == kv.TiFlash

buf := bytes.NewBuffer(make([]byte, 0, 16))
{
printTiFlashSpecificInfo := func() {
if isTiFlashCop {
fmt.Fprintf(buf, ", threads:%d}", totalThreads)
buf.WriteString(", ")
buf.WriteString("threads:")
buf.WriteString(strconv.Itoa(int(totalThreads)))
buf.WriteString("}")
if !totalTiFlashWaitSummary.CanBeIgnored() {
buf.WriteString(", " + totalTiFlashWaitSummary.String())
buf.WriteString(", ")
buf.WriteString(totalTiFlashWaitSummary.String())
}
if !totalTiFlashScanContext.Empty() {
buf.WriteString(", " + totalTiFlashScanContext.String())
buf.WriteString(", ")
buf.WriteString(totalTiFlashScanContext.String())
}
} else {
buf.WriteString("}")
}
}
if totalTasks == 1 {
fmt.Fprintf(buf, "%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(time.Duration(procTimes.GetPercentile(0))), totalLoops)
buf.WriteString(crs.storeType.Name())
buf.WriteString("_task:{time:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0))))
buf.WriteString(", loops:")
buf.WriteString(strconv.Itoa(int(totalLoops)))
printTiFlashSpecificInfo()
} else {
fmt.Fprintf(buf, "%v_task:{proc max:%v, min:%v, avg: %v, p80:%v, p95:%v, iters:%v, tasks:%v",
crs.storeType, FormatDuration(time.Duration(procTimes.GetMax().GetFloat64())), FormatDuration(time.Duration(procTimes.GetMin().GetFloat64())), FormatDuration(avgTime),
FormatDuration(time.Duration(procTimes.GetPercentile(0.8))), FormatDuration(time.Duration(procTimes.GetPercentile(0.95))), totalLoops, totalTasks)
buf.WriteString(crs.storeType.Name())
buf.WriteString("_task:{proc max:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64())))
buf.WriteString(", min:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64())))
buf.WriteString(", avg: ")
buf.WriteString(FormatDuration(avgTime))
buf.WriteString(", p80:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8))))
buf.WriteString(", p95:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95))))
buf.WriteString(", iters:")
buf.WriteString(strconv.Itoa(int(totalLoops)))
buf.WriteString(", tasks:")
buf.WriteString(strconv.Itoa(int(totalTasks)))
printTiFlashSpecificInfo()
}
}
Expand Down Expand Up @@ -1434,11 +1466,16 @@ func (e *BasicRuntimeStats) String() string {
totalTime := e.consume.Load()
openTime := e.open.Load()
closeTime := e.close.Load()
str.WriteString(fmt.Sprintf("%stime:", timePrefix))
str.WriteString(timePrefix)
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(totalTime)))
str.WriteString(fmt.Sprintf(", %sopen:", timePrefix))
str.WriteString(", ")
str.WriteString(timePrefix)
str.WriteString("open:")
str.WriteString(FormatDuration(time.Duration(openTime)))
str.WriteString(fmt.Sprintf(", %sclose:", timePrefix))
str.WriteString(", ")
str.WriteString(timePrefix)
str.WriteString("close:")
str.WriteString(FormatDuration(time.Duration(closeTime)))
str.WriteString(", loops:")
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
Expand Down Expand Up @@ -1562,7 +1599,7 @@ func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats {
}

// GetOrCreateCopStats gets the CopRuntimeStats specified by planID, if not exists a new one will be created.
func (e *RuntimeStatsColl) GetOrCreateCopStats(planID int, storeType string) *CopRuntimeStats {
func (e *RuntimeStatsColl) GetOrCreateCopStats(planID int, storeType kv.StoreType) *CopRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
copStats, ok := e.copStats[planID]
Expand All @@ -1589,7 +1626,7 @@ func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int,
}

// RecordOneCopTask records a specific cop tasks's execution detail.
func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, address string, summary *tipb.ExecutorExecutionSummary) int {
func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, address string, summary *tipb.ExecutorExecutionSummary) int {
// for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in
// summary, use it overwrite the planID
if id, valid := getPlanIDFromExecutionSummary(summary); valid {
Expand All @@ -1601,13 +1638,13 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, addres
}

// RecordScanDetail records a specific cop tasks's cop detail.
func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType string, detail *util.ScanDetail) {
func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType kv.StoreType, detail *util.ScanDetail) {
copStats := e.GetOrCreateCopStats(planID, storeType)
copStats.scanDetail.Merge(detail)
}

// RecordTimeDetail records a specific cop tasks's time detail.
func (e *RuntimeStatsColl) RecordTimeDetail(planID int, storeType string, detail *util.TimeDetail) {
func (e *RuntimeStatsColl) RecordTimeDetail(planID int, storeType kv.StoreType, detail *util.TimeDetail) {
copStats := e.GetOrCreateCopStats(planID, storeType)
if detail != nil {
copStats.timeDetail.Merge(detail)
Expand Down Expand Up @@ -1675,20 +1712,23 @@ func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats {

// String implements the RuntimeStats interface.
func (e *RuntimeStatsWithConcurrencyInfo) String() string {
var result string
buf := bytes.NewBuffer(make([]byte, 0, 8))
if len(e.concurrency) > 0 {
for i, concurrency := range e.concurrency {
if i > 0 {
result += ", "
buf.WriteString(", ")
}
if concurrency.concurrencyNum > 0 {
result += fmt.Sprintf("%s:%d", concurrency.concurrencyName, concurrency.concurrencyNum)
buf.WriteString(concurrency.concurrencyName)
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(concurrency.concurrencyNum))
} else {
result += fmt.Sprintf("%s:OFF", concurrency.concurrencyName)
buf.WriteString(concurrency.concurrencyName)
buf.WriteString(":OFF")
}
}
}
return result
return buf.String()
}

// Merge implements the RuntimeStats interface.
Expand Down Expand Up @@ -1790,11 +1830,11 @@ func (e *RuntimeStatsWithCommit) String() string {
buf.WriteString(FormatDuration(time.Duration(commitBackoffTime)))
if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 {
buf.WriteString(", prewrite type: ")
buf.WriteString(e.formatBackoff(e.Commit.Mu.PrewriteBackoffTypes))
e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes)
}
if len(e.Commit.Mu.CommitBackoffTypes) > 0 {
buf.WriteString(", commit type: ")
buf.WriteString(e.formatBackoff(e.Commit.Mu.CommitBackoffTypes))
e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes)
}
buf.WriteString("}")
}
Expand Down Expand Up @@ -1872,7 +1912,7 @@ func (e *RuntimeStatsWithCommit) String() string {
buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime)))
if len(e.LockKeys.Mu.BackoffTypes) > 0 {
buf.WriteString(", type: ")
buf.WriteString(e.formatBackoff(e.LockKeys.Mu.BackoffTypes))
e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes)
}
buf.WriteString("}")
}
Expand Down Expand Up @@ -1906,9 +1946,9 @@ func (e *RuntimeStatsWithCommit) String() string {
return buf.String()
}

func (*RuntimeStatsWithCommit) formatBackoff(backoffTypes []string) string {
func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) {
if len(backoffTypes) == 0 {
return ""
return
}
tpMap := make(map[string]struct{})
tpArray := []string{}
Expand All @@ -1921,7 +1961,14 @@ func (*RuntimeStatsWithCommit) formatBackoff(backoffTypes []string) string {
tpArray = append(tpArray, tpStr)
}
slices.Sort(tpArray)
return fmt.Sprintf("%v", tpArray)
buf.WriteByte('[')
for i, tp := range tpArray {
if i > 0 {
buf.WriteString(" ")
}
buf.WriteString(tp)
}
buf.WriteByte(']')
}

// FormatDuration uses to format duration, this function will prune precision before format duration.
Expand Down Expand Up @@ -1987,7 +2034,10 @@ type RURuntimeStats struct {
// String implements the RuntimeStats interface.
func (e *RURuntimeStats) String() string {
if e.RUDetails != nil {
return fmt.Sprintf("RU:%f", e.RRU()+e.WRU())
buf := bytes.NewBuffer(make([]byte, 0, 8))
buf.WriteString("RU:")
buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64))
return buf.String()
}
return ""
}
Expand Down
Loading