diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index 585bfb9a14c9d..3ed761d4d1a78 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -473,27 +473,27 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro } // FillDummySummariesForTiFlashTasks fills dummy execution summaries for mpp tasks which lack summaries -func FillDummySummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, callee string, storeTypeName string, allPlanIDs []int, recordedPlanIDs map[int]int) { +func FillDummySummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, callee string, storeType kv.StoreType, allPlanIDs []int, recordedPlanIDs map[int]int) { num := uint64(0) dummySummary := &tipb.ExecutorExecutionSummary{TimeProcessedNs: &num, NumProducedRows: &num, NumIterations: &num, ExecutorId: nil} for _, planID := range allPlanIDs { if _, ok := recordedPlanIDs[planID]; !ok { - runtimeStatsColl.RecordOneCopTask(planID, storeTypeName, callee, dummySummary) + runtimeStatsColl.RecordOneCopTask(planID, storeType, callee, dummySummary) } } } // recordExecutionSummariesForTiFlashTasks records mpp task execution summaries -func recordExecutionSummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, executionSummaries []*tipb.ExecutorExecutionSummary, callee string, storeTypeName string, allPlanIDs []int) { +func recordExecutionSummariesForTiFlashTasks(runtimeStatsColl *execdetails.RuntimeStatsColl, executionSummaries []*tipb.ExecutorExecutionSummary, callee string, storeType kv.StoreType, allPlanIDs []int) { var recordedPlanIDs = make(map[int]int) for _, detail := range executionSummaries { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { recordedPlanIDs[runtimeStatsColl. - RecordOneCopTask(-1, storeTypeName, callee, detail)] = 0 + RecordOneCopTask(-1, storeType, callee, detail)] = 0 } } - FillDummySummariesForTiFlashTasks(runtimeStatsColl, callee, storeTypeName, allPlanIDs, recordedPlanIDs) + FillDummySummariesForTiFlashTasks(runtimeStatsColl, callee, storeType, allPlanIDs, recordedPlanIDs) } func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) { @@ -524,10 +524,10 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr r.stats.mergeCopRuntimeStats(copStats, respTime) if copStats.ScanDetail != nil && len(r.copPlanIDs) > 0 { - r.ctx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail) + r.ctx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType, copStats.ScanDetail) } if len(r.copPlanIDs) > 0 { - r.ctx.RuntimeStatsColl.RecordTimeDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), &copStats.TimeDetail) + r.ctx.RuntimeStatsColl.RecordTimeDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType, &copStats.TimeDetail) } // If hasExecutor is true, it means the summary is returned from TiFlash. @@ -551,7 +551,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr r.ctx.CPUUsage.MergeTikvCPUTime(copStats.TimeDetail.ProcessTime) } if hasExecutor { - recordExecutionSummariesForTiFlashTasks(r.ctx.RuntimeStatsColl, r.selectResp.GetExecutionSummaries(), callee, r.storeType.Name(), r.copPlanIDs) + recordExecutionSummariesForTiFlashTasks(r.ctx.RuntimeStatsColl, r.selectResp.GetExecutionSummaries(), callee, r.storeType, r.copPlanIDs) } else { // For cop task cases, we still need this protection. if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) { @@ -570,7 +570,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr detail.NumProducedRows != nil && detail.NumIterations != nil { planID := r.copPlanIDs[i] r.ctx.RuntimeStatsColl. - RecordOneCopTask(planID, r.storeType.Name(), callee, detail) + RecordOneCopTask(planID, r.storeType, callee, detail) } } } diff --git a/pkg/distsql/select_result_test.go b/pkg/distsql/select_result_test.go index 478f9dcd8d205..9839a65886091 100644 --- a/pkg/distsql/select_result_test.go +++ b/pkg/distsql/select_result_test.go @@ -56,5 +56,5 @@ func TestUpdateCopRuntimeStats(t *testing.T) { require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries())) sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0) - require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String()) + require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, kv.TiKV).String()) } diff --git a/pkg/executor/internal/exec/BUILD.bazel b/pkg/executor/internal/exec/BUILD.bazel index 9b970add795e9..e80cea2a91bbc 100644 --- a/pkg/executor/internal/exec/BUILD.bazel +++ b/pkg/executor/internal/exec/BUILD.bazel @@ -42,6 +42,7 @@ go_test( deps = [ ":exec", "//pkg/domain", + "//pkg/kv", "//pkg/parser/model", "//pkg/sessionctx/stmtctx", "//pkg/statistics", diff --git a/pkg/executor/internal/exec/indexusage_test.go b/pkg/executor/internal/exec/indexusage_test.go index 254f85b08b060..8ecf11ec6a23c 100644 --- a/pkg/executor/internal/exec/indexusage_test.go +++ b/pkg/executor/internal/exec/indexusage_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" @@ -65,7 +66,7 @@ func TestIndexUsageReporter(t *testing.T) { rows := uint64(2024) zero := uint64(0) executorID := "test-executor" - runtimeStatsColl.GetOrCreateCopStats(planID, "test-store").RecordOneCopTask("1", &tipb.ExecutorExecutionSummary{ + runtimeStatsColl.GetOrCreateCopStats(planID, kv.TiKV).RecordOneCopTask("1", &tipb.ExecutorExecutionSummary{ TimeProcessedNs: &zero, NumProducedRows: &rows, NumIterations: &zero, diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index ab6e445a75a13..0bfb0b3780452 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -640,7 +640,7 @@ func (c *localMppCoordinator) handleAllReports() error { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { recordedPlanIDs[c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl. - RecordOneCopTask(-1, kv.TiFlash.Name(), report.mppReq.Meta.GetAddress(), detail)] = 0 + RecordOneCopTask(-1, kv.TiFlash, report.mppReq.Meta.GetAddress(), detail)] = 0 } } if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { @@ -649,7 +649,7 @@ func (c *localMppCoordinator) handleAllReports() error { } } } - distsql.FillDummySummariesForTiFlashTasks(c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl, "", kv.TiFlash.Name(), c.planIDs, recordedPlanIDs) + distsql.FillDummySummariesForTiFlashTasks(c.sessionCtx.GetSessionVars().StmtCtx.RuntimeStatsColl, "", kv.TiFlash, c.planIDs, recordedPlanIDs) case <-time.After(receiveReportTimeout): metrics.MppCoordinatorStatsReportNotReceived.Inc() logutil.BgLogger().Warn(fmt.Sprintf("Mpp coordinator not received all reports within %d seconds", int(receiveReportTimeout.Seconds())), diff --git a/pkg/util/execdetails/BUILD.bazel b/pkg/util/execdetails/BUILD.bazel index fb5bfb363c53d..7c4335b4a667e 100644 --- a/pkg/util/execdetails/BUILD.bazel +++ b/pkg/util/execdetails/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/util/execdetails", visibility = ["//visibility:public"], deps = [ + "//pkg/kv", "@com_github_influxdata_tdigest//:tdigest", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_tipb//go-tipb", @@ -25,6 +26,7 @@ go_test( flaky = True, race = "on", deps = [ + "//pkg/kv", "//pkg/testkit/testsetup", "@com_github_pingcap_tipb//go-tipb", "@com_github_stretchr_testify//require", diff --git a/pkg/util/execdetails/execdetails.go b/pkg/util/execdetails/execdetails.go index 53a588e4357b4..0e1f41da8c8e8 100644 --- a/pkg/util/execdetails/execdetails.go +++ b/pkg/util/execdetails/execdetails.go @@ -28,6 +28,7 @@ import ( "github.com/influxdata/tdigest" "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" @@ -553,7 +554,7 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) { } type basicCopRuntimeStats struct { - storeType string + storeType kv.StoreType BasicRuntimeStats threads int32 totalTasks int32 @@ -687,13 +688,24 @@ func (p *Percentile[valueType]) Sum() float64 { // String implements the RuntimeStats interface. func (e *basicCopRuntimeStats) String() string { - if e.storeType == "tiflash" { + buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString("time:") + buf.WriteString(FormatDuration(time.Duration(e.consume.Load()))) + buf.WriteString(", loops:") + buf.WriteString(strconv.Itoa(int(e.loop.Load()))) + if e.storeType == kv.TiFlash { + buf.WriteString(", threads:") + buf.WriteString(strconv.Itoa(int(e.threads))) + buf.WriteString(", ") if e.tiflashWaitSummary.CanBeIgnored() { - return fmt.Sprintf("time:%v, loops:%d, threads:%d, %s", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads, e.tiflashScanContext.String()) + buf.WriteString(e.tiflashScanContext.String()) + } else { + buf.WriteString(e.tiflashWaitSummary.String()) + buf.WriteString(", ") + buf.WriteString(e.tiflashScanContext.String()) } - return fmt.Sprintf("time:%v, loops:%d, threads:%d, %s, %s", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads, e.tiflashWaitSummary.String(), e.tiflashScanContext.String()) } - return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load()) + return buf.String() } // Clone implements the RuntimeStats interface. @@ -817,8 +829,7 @@ type CopRuntimeStats struct { stats map[string]*basicCopRuntimeStats scanDetail *util.ScanDetail timeDetail *util.TimeDetail - // do not use kv.StoreType because it will meet cycle import error - storeType string + storeType kv.StoreType sync.Mutex } @@ -875,30 +886,51 @@ func (crs *CopRuntimeStats) String() string { procTimes, totalTime, totalTasks, totalLoops, totalThreads, totalTiFlashScanContext, totalTiFlashWaitSummary := crs.MergeBasicStats() avgTime := time.Duration(totalTime.Nanoseconds() / int64(totalTasks)) - isTiFlashCop := crs.storeType == "tiflash" + isTiFlashCop := crs.storeType == kv.TiFlash buf := bytes.NewBuffer(make([]byte, 0, 16)) { printTiFlashSpecificInfo := func() { if isTiFlashCop { - fmt.Fprintf(buf, ", threads:%d}", totalThreads) + buf.WriteString(", ") + buf.WriteString("threads:") + buf.WriteString(strconv.Itoa(int(totalThreads))) + buf.WriteString("}") if !totalTiFlashWaitSummary.CanBeIgnored() { - buf.WriteString(", " + totalTiFlashWaitSummary.String()) + buf.WriteString(", ") + buf.WriteString(totalTiFlashWaitSummary.String()) } if !totalTiFlashScanContext.Empty() { - buf.WriteString(", " + totalTiFlashScanContext.String()) + buf.WriteString(", ") + buf.WriteString(totalTiFlashScanContext.String()) } } else { buf.WriteString("}") } } if totalTasks == 1 { - fmt.Fprintf(buf, "%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(time.Duration(procTimes.GetPercentile(0))), totalLoops) + buf.WriteString(crs.storeType.Name()) + buf.WriteString("_task:{time:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0)))) + buf.WriteString(", loops:") + buf.WriteString(strconv.Itoa(int(totalLoops))) printTiFlashSpecificInfo() } else { - fmt.Fprintf(buf, "%v_task:{proc max:%v, min:%v, avg: %v, p80:%v, p95:%v, iters:%v, tasks:%v", - crs.storeType, FormatDuration(time.Duration(procTimes.GetMax().GetFloat64())), FormatDuration(time.Duration(procTimes.GetMin().GetFloat64())), FormatDuration(avgTime), - FormatDuration(time.Duration(procTimes.GetPercentile(0.8))), FormatDuration(time.Duration(procTimes.GetPercentile(0.95))), totalLoops, totalTasks) + buf.WriteString(crs.storeType.Name()) + buf.WriteString("_task:{proc max:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64()))) + buf.WriteString(", min:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64()))) + buf.WriteString(", avg: ") + buf.WriteString(FormatDuration(avgTime)) + buf.WriteString(", p80:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8)))) + buf.WriteString(", p95:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95)))) + buf.WriteString(", iters:") + buf.WriteString(strconv.Itoa(int(totalLoops))) + buf.WriteString(", tasks:") + buf.WriteString(strconv.Itoa(int(totalTasks))) printTiFlashSpecificInfo() } } @@ -1442,11 +1474,16 @@ func (e *BasicRuntimeStats) String() string { totalTime := e.consume.Load() openTime := e.open.Load() closeTime := e.close.Load() - str.WriteString(fmt.Sprintf("%stime:", timePrefix)) + str.WriteString(timePrefix) + str.WriteString("time:") str.WriteString(FormatDuration(time.Duration(totalTime))) - str.WriteString(fmt.Sprintf(", %sopen:", timePrefix)) + str.WriteString(", ") + str.WriteString(timePrefix) + str.WriteString("open:") str.WriteString(FormatDuration(time.Duration(openTime))) - str.WriteString(fmt.Sprintf(", %sclose:", timePrefix)) + str.WriteString(", ") + str.WriteString(timePrefix) + str.WriteString("close:") str.WriteString(FormatDuration(time.Duration(closeTime))) str.WriteString(", loops:") str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10)) @@ -1570,7 +1607,7 @@ func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats { } // GetOrCreateCopStats gets the CopRuntimeStats specified by planID, if not exists a new one will be created. -func (e *RuntimeStatsColl) GetOrCreateCopStats(planID int, storeType string) *CopRuntimeStats { +func (e *RuntimeStatsColl) GetOrCreateCopStats(planID int, storeType kv.StoreType) *CopRuntimeStats { e.mu.Lock() defer e.mu.Unlock() copStats, ok := e.copStats[planID] @@ -1597,7 +1634,7 @@ func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, } // RecordOneCopTask records a specific cop tasks's execution detail. -func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, address string, summary *tipb.ExecutorExecutionSummary) int { +func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, address string, summary *tipb.ExecutorExecutionSummary) int { // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in // summary, use it overwrite the planID if id, valid := getPlanIDFromExecutionSummary(summary); valid { @@ -1609,13 +1646,13 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, addres } // RecordScanDetail records a specific cop tasks's cop detail. -func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType string, detail *util.ScanDetail) { +func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType kv.StoreType, detail *util.ScanDetail) { copStats := e.GetOrCreateCopStats(planID, storeType) copStats.scanDetail.Merge(detail) } // RecordTimeDetail records a specific cop tasks's time detail. -func (e *RuntimeStatsColl) RecordTimeDetail(planID int, storeType string, detail *util.TimeDetail) { +func (e *RuntimeStatsColl) RecordTimeDetail(planID int, storeType kv.StoreType, detail *util.TimeDetail) { copStats := e.GetOrCreateCopStats(planID, storeType) if detail != nil { copStats.timeDetail.Merge(detail) @@ -1683,20 +1720,23 @@ func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats { // String implements the RuntimeStats interface. func (e *RuntimeStatsWithConcurrencyInfo) String() string { - var result string + buf := bytes.NewBuffer(make([]byte, 0, 8)) if len(e.concurrency) > 0 { for i, concurrency := range e.concurrency { if i > 0 { - result += ", " + buf.WriteString(", ") } if concurrency.concurrencyNum > 0 { - result += fmt.Sprintf("%s:%d", concurrency.concurrencyName, concurrency.concurrencyNum) + buf.WriteString(concurrency.concurrencyName) + buf.WriteByte(':') + buf.WriteString(strconv.Itoa(concurrency.concurrencyNum)) } else { - result += fmt.Sprintf("%s:OFF", concurrency.concurrencyName) + buf.WriteString(concurrency.concurrencyName) + buf.WriteString(":OFF") } } } - return result + return buf.String() } // Merge implements the RuntimeStats interface. @@ -1798,11 +1838,11 @@ func (e *RuntimeStatsWithCommit) String() string { buf.WriteString(FormatDuration(time.Duration(commitBackoffTime))) if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 { buf.WriteString(", prewrite type: ") - buf.WriteString(e.formatBackoff(e.Commit.Mu.PrewriteBackoffTypes)) + e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes) } if len(e.Commit.Mu.CommitBackoffTypes) > 0 { buf.WriteString(", commit type: ") - buf.WriteString(e.formatBackoff(e.Commit.Mu.CommitBackoffTypes)) + e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes) } buf.WriteString("}") } @@ -1880,7 +1920,7 @@ func (e *RuntimeStatsWithCommit) String() string { buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime))) if len(e.LockKeys.Mu.BackoffTypes) > 0 { buf.WriteString(", type: ") - buf.WriteString(e.formatBackoff(e.LockKeys.Mu.BackoffTypes)) + e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes) } buf.WriteString("}") } @@ -1914,9 +1954,9 @@ func (e *RuntimeStatsWithCommit) String() string { return buf.String() } -func (*RuntimeStatsWithCommit) formatBackoff(backoffTypes []string) string { +func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) { if len(backoffTypes) == 0 { - return "" + return } tpMap := make(map[string]struct{}) tpArray := []string{} @@ -1929,7 +1969,14 @@ func (*RuntimeStatsWithCommit) formatBackoff(backoffTypes []string) string { tpArray = append(tpArray, tpStr) } slices.Sort(tpArray) - return fmt.Sprintf("%v", tpArray) + buf.WriteByte('[') + for i, tp := range tpArray { + if i > 0 { + buf.WriteString(" ") + } + buf.WriteString(tp) + } + buf.WriteByte(']') } // FormatDuration uses to format duration, this function will prune precision before format duration. @@ -1995,7 +2042,10 @@ type RURuntimeStats struct { // String implements the RuntimeStats interface. func (e *RURuntimeStats) String() string { if e.RUDetails != nil { - return fmt.Sprintf("RU:%f", e.RRU()+e.WRU()) + buf := bytes.NewBuffer(make([]byte, 0, 8)) + buf.WriteString("RU:") + buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64)) + return buf.String() } return "" } diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index 6128d085389f1..85f6a8493d885 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" @@ -205,10 +206,10 @@ func TestCopRuntimeStats(t *testing.T) { tableScanID := 1 aggID := 2 tableReaderID := 3 - stats.RecordOneCopTask(tableScanID, "tikv", "8.8.8.8", mockExecutorExecutionSummary(1, 1, 1)) - stats.RecordOneCopTask(tableScanID, "tikv", "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2)) - stats.RecordOneCopTask(aggID, "tikv", "8.8.8.8", mockExecutorExecutionSummary(3, 3, 3)) - stats.RecordOneCopTask(aggID, "tikv", "8.8.8.9", mockExecutorExecutionSummary(4, 4, 4)) + stats.RecordOneCopTask(tableScanID, kv.TiKV, "8.8.8.8", mockExecutorExecutionSummary(1, 1, 1)) + stats.RecordOneCopTask(tableScanID, kv.TiKV, "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2)) + stats.RecordOneCopTask(aggID, kv.TiKV, "8.8.8.8", mockExecutorExecutionSummary(3, 3, 3)) + stats.RecordOneCopTask(aggID, kv.TiKV, "8.8.8.9", mockExecutorExecutionSummary(4, 4, 4)) scanDetail := &util.ScanDetail{ TotalKeys: 15, ProcessedKeys: 10, @@ -219,10 +220,10 @@ func TestCopRuntimeStats(t *testing.T) { RocksdbBlockReadCount: 20, RocksdbBlockReadByte: 100, } - stats.RecordScanDetail(tableScanID, "tikv", scanDetail) + stats.RecordScanDetail(tableScanID, kv.TiKV, scanDetail) require.True(t, stats.ExistsCopStats(tableScanID)) - cop := stats.GetOrCreateCopStats(tableScanID, "tikv") + cop := stats.GetOrCreateCopStats(tableScanID, kv.TiKV) expected := "tikv_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " + "scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 100 Bytes}}}" require.Equal(t, expected, cop.String()) @@ -235,7 +236,7 @@ func TestCopRuntimeStats(t *testing.T) { newCopStats.Record(time.Second, 10) copStats.Merge(newCopStats) require.Equal(t, "time:1s, loops:2", copStats.String()) - require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String()) + require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, kv.TiKV).String()) rootStats := stats.GetRootStats(tableReaderID) require.NotNil(t, rootStats) @@ -259,10 +260,10 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) { tableScanID := 1 aggID := 2 tableReaderID := 3 - stats.RecordOneCopTask(tableScanID, "tiflash", "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(1, 1, 1, 1, 8192, 0, 15, 200, 40, 10, 4, 1, 100, 50, 30000000, 20000000, 10000000, "tablescan_"+strconv.Itoa(tableScanID))) - stats.RecordOneCopTask(tableScanID, "tiflash", "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(2, 2, 2, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 20000000, 10000000, 5000000, "tablescan_"+strconv.Itoa(tableScanID))) - stats.RecordOneCopTask(aggID, "tiflash", "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(3, 3, 3, 1, 12000, 6000, 60, 1000, 20, 5, 1, 0, 20, 0, 0, 0, 0, "aggregation_"+strconv.Itoa(aggID))) - stats.RecordOneCopTask(aggID, "tiflash", "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(4, 4, 4, 1, 8192, 80000, 40, 2000, 30, 1, 1, 0, 0, 0, 0, 0, 0, "aggregation_"+strconv.Itoa(aggID))) + stats.RecordOneCopTask(tableScanID, kv.TiFlash, "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(1, 1, 1, 1, 8192, 0, 15, 200, 40, 10, 4, 1, 100, 50, 30000000, 20000000, 10000000, "tablescan_"+strconv.Itoa(tableScanID))) + stats.RecordOneCopTask(tableScanID, kv.TiFlash, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(2, 2, 2, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 20000000, 10000000, 5000000, "tablescan_"+strconv.Itoa(tableScanID))) + stats.RecordOneCopTask(aggID, kv.TiFlash, "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(3, 3, 3, 1, 12000, 6000, 60, 1000, 20, 5, 1, 0, 20, 0, 0, 0, 0, "aggregation_"+strconv.Itoa(aggID))) + stats.RecordOneCopTask(aggID, kv.TiFlash, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(4, 4, 4, 1, 8192, 80000, 40, 2000, 30, 1, 1, 0, 0, 0, 0, 0, 0, "aggregation_"+strconv.Itoa(aggID))) scanDetail := &util.ScanDetail{ TotalKeys: 10, ProcessedKeys: 10, @@ -272,10 +273,10 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) { RocksdbBlockReadCount: 10, RocksdbBlockReadByte: 100, } - stats.RecordScanDetail(tableScanID, "tiflash", scanDetail) + stats.RecordScanDetail(tableScanID, kv.TiFlash, scanDetail) require.True(t, stats.ExistsCopStats(tableScanID)) - cop := stats.GetOrCreateCopStats(tableScanID, "tiflash") + cop := stats.GetOrCreateCopStats(tableScanID, kv.TiFlash) require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}, tiflash_wait: {minTSO_wait: 20ms, pipeline_breaker_wait: 5ms, pipeline_queue_wait: 10ms}, tiflash_scan:{mvcc_input_rows:0, mvcc_input_bytes:0, mvcc_output_rows:0, lm_skip_rows:0, local_regions:10, remote_regions:4, tot_learner_read:1ms, region_balance:none, delta_rows:0, delta_bytes:0, segments:0, stale_read_regions:0, tot_build_snapshot:40ms, tot_build_bitmap:0ms, tot_build_inputstream:0ms, min_local_stream:0ms, max_local_stream:0ms, dtfile:{data_scanned_rows:8192, data_skipped_rows:0, mvcc_scanned_rows:0, mvcc_skipped_rows:0, lm_filter_scanned_rows:0, lm_filter_skipped_rows:0, tot_rs_index_check:15ms, tot_read:202ms, disagg_cache_hit_bytes: 100, disagg_cache_miss_bytes: 50}}", cop.String()) copStats := cop.stats["8.8.8.8"] @@ -285,7 +286,7 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) { copStats.Record(time.Second, 10) require.Equal(t, "time:1s, loops:2, threads:1, tiflash_wait: {minTSO_wait: 30ms, pipeline_breaker_wait: 10ms, pipeline_queue_wait: 20ms}, tiflash_scan:{mvcc_input_rows:0, mvcc_input_bytes:0, mvcc_output_rows:0, lm_skip_rows:0, local_regions:10, remote_regions:4, tot_learner_read:1ms, region_balance:none, delta_rows:0, delta_bytes:0, segments:0, stale_read_regions:0, tot_build_snapshot:40ms, tot_build_bitmap:0ms, tot_build_inputstream:0ms, min_local_stream:0ms, max_local_stream:0ms, dtfile:{data_scanned_rows:8192, data_skipped_rows:0, mvcc_scanned_rows:0, mvcc_skipped_rows:0, lm_filter_scanned_rows:0, lm_filter_skipped_rows:0, tot_rs_index_check:15ms, tot_read:200ms, disagg_cache_hit_bytes: 100, disagg_cache_miss_bytes: 50}}", copStats.String()) expected := "tiflash_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}, tiflash_scan:{mvcc_input_rows:0, mvcc_input_bytes:0, mvcc_output_rows:0, lm_skip_rows:0, local_regions:6, remote_regions:2, tot_learner_read:0ms, region_balance:none, delta_rows:0, delta_bytes:0, segments:0, stale_read_regions:0, tot_build_snapshot:50ms, tot_build_bitmap:0ms, tot_build_inputstream:0ms, min_local_stream:0ms, max_local_stream:0ms, dtfile:{data_scanned_rows:20192, data_skipped_rows:86000, mvcc_scanned_rows:0, mvcc_skipped_rows:0, lm_filter_scanned_rows:0, lm_filter_skipped_rows:0, tot_rs_index_check:100ms, tot_read:3000ms, disagg_cache_hit_bytes: 20, disagg_cache_miss_bytes: 0}}" - require.Equal(t, expected, stats.GetOrCreateCopStats(aggID, "tiflash").String()) + require.Equal(t, expected, stats.GetOrCreateCopStats(aggID, kv.TiFlash).String()) rootStats := stats.GetRootStats(tableReaderID) require.NotNil(t, rootStats) @@ -299,8 +300,8 @@ func TestVectorSearchStats(t *testing.T) { execSummary := mockExecutorExecutionSummaryForTiFlash(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "") execSummary.DetailInfo.(*tipb.ExecutorExecutionSummary_TiflashScanContext).TiflashScanContext.TotalVectorIdxLoadFromS3 = &v - stats.RecordOneCopTask(1, "tiflash", "8.8.8.8", execSummary) - s := stats.GetOrCreateCopStats(1, "tiflash") + stats.RecordOneCopTask(1, kv.TiFlash, "8.8.8.8", execSummary) + s := stats.GetOrCreateCopStats(1, kv.TiFlash) require.Equal(t, "tiflash_task:{time:0s, loops:0, threads:0}, vector_idx:{load:{total:0ms,from_s3:1,from_disk:0,from_cache:0},search:{total:0ms,visited_nodes:0,discarded_nodes:0},read:{vec_total:0ms,others_total:0ms}}, tiflash_scan:{mvcc_input_rows:0, mvcc_input_bytes:0, mvcc_output_rows:0, lm_skip_rows:0, local_regions:0, remote_regions:0, tot_learner_read:0ms, region_balance:none, delta_rows:0, delta_bytes:0, segments:0, stale_read_regions:0, tot_build_snapshot:0ms, tot_build_bitmap:0ms, tot_build_inputstream:0ms, min_local_stream:0ms, max_local_stream:0ms, dtfile:{data_scanned_rows:0, data_skipped_rows:0, mvcc_scanned_rows:0, mvcc_skipped_rows:0, lm_filter_scanned_rows:0, lm_filter_skipped_rows:0, tot_rs_index_check:0ms, tot_read:0ms}}", s.String()) } @@ -545,14 +546,14 @@ func TestCopRuntimeStats2(t *testing.T) { KvReadWallTime: 5 * time.Millisecond, TotalRPCWallTime: 50 * time.Millisecond, } - stats.RecordScanDetail(tableScanID, "tikv", scanDetail) + stats.RecordScanDetail(tableScanID, kv.TiKV, scanDetail) for range 1005 { - stats.RecordOneCopTask(tableScanID, "tikv", "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2)) - stats.RecordScanDetail(tableScanID, "tikv", scanDetail) - stats.RecordTimeDetail(tableScanID, "tikv", timeDetail) + stats.RecordOneCopTask(tableScanID, kv.TiKV, "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2)) + stats.RecordScanDetail(tableScanID, kv.TiKV, scanDetail) + stats.RecordTimeDetail(tableScanID, kv.TiKV, timeDetail) } - cop := stats.GetOrCreateCopStats(tableScanID, "tikv") + cop := stats.GetOrCreateCopStats(tableScanID, kv.TiKV) expected := "tikv_task:{proc max:0s, min:0s, avg: 2ns, p80:2ns, p95:2ns, iters:2010, tasks:1005}, " + "scan_detail: {total_process_keys: 10060, total_process_keys_size: 10060, total_keys: 15090, " + "rocksdb: {delete_skipped_count: 5030, key_skipped_count: 1006, " +