From 76882df39c1d3493211f43281e7171ac55050829 Mon Sep 17 00:00:00 2001 From: "Zhuomin(Charming) Liu" Date: Mon, 14 Sep 2020 14:40:06 +0800 Subject: [PATCH 1/2] cherry pick #19948 to release-4.0 Signed-off-by: ti-srebot --- distsql/distsql_test.go | 26 ++++++++++++++++++ distsql/select_result.go | 58 +++++++++++++++++++++++++++++++++++++++ planner/core/cbo_test.go | 3 ++ store/tikv/coprocessor.go | 3 ++ 4 files changed, 90 insertions(+) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index e65d94224958f..a949bf38f5f2d 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -156,6 +156,32 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) { c.Assert(err, IsNil) } +<<<<<<< HEAD +======= +func (s *testSuite) TestSelectResultRuntimeStats(c *C) { + basic := &execdetails.BasicRuntimeStats{} + basic.Record(time.Second, 20) + s1 := &selectResultRuntimeStats{ + copRespTime: []time.Duration{time.Second, time.Millisecond}, + procKeys: []int64{100, 200}, + backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, + totalProcessTime: time.Second, + totalWaitTime: time.Second, + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + s2 := *s1 + stmtStats := execdetails.NewRuntimeStatsColl() + stmtStats.RegisterStats(1, basic) + stmtStats.RegisterStats(1, s1) + stmtStats.RegisterStats(1, &s2) + stats := stmtStats.GetRootStats(1) + expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}" + c.Assert(stats.String(), Equals, expect) + // Test for idempotence. + c.Assert(stats.String(), Equals, expect) +} + +>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) { request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetDAGRequest(&tipb.DAGRequest{}). diff --git a/distsql/select_result.go b/distsql/select_result.go index 5c4b83560787b..401afa2219639 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -326,6 +326,7 @@ type selectResultRuntimeStats struct { totalProcessTime time.Duration totalWaitTime time.Duration rpcStat tikv.RegionRequestRuntimeStats + CoprCacheHitNum int64 } func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) { @@ -338,8 +339,51 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim s.totalProcessTime += copStats.ProcessTime s.totalWaitTime += copStats.WaitTime s.rpcStat.Merge(copStats.RegionRequestRuntimeStats) + if copStats.CoprCacheHit { + s.CoprCacheHitNum++ + } +} + +<<<<<<< HEAD +======= +func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { + newRs := selectResultRuntimeStats{ + copRespTime: make([]time.Duration, 0, len(s.copRespTime)), + procKeys: make([]int64, 0, len(s.procKeys)), + backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) + newRs.procKeys = append(newRs.procKeys, s.procKeys...) + for k, v := range s.backoffSleep { + newRs.backoffSleep[k] += v + } + newRs.totalProcessTime += s.totalProcessTime + newRs.totalWaitTime += s.totalWaitTime + for k, v := range s.rpcStat.Stats { + newRs.rpcStat.Stats[k] = v + } + return &newRs } +func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) { + other, ok := rs.(*selectResultRuntimeStats) + if !ok { + return + } + s.copRespTime = append(s.copRespTime, other.copRespTime...) + s.procKeys = append(s.procKeys, other.procKeys...) + + for k, v := range other.backoffSleep { + s.backoffSleep[k] += v + } + s.totalProcessTime += other.totalProcessTime + s.totalWaitTime += other.totalWaitTime + s.rpcStat.Merge(other.rpcStat) + s.CoprCacheHitNum += other.CoprCacheHitNum +} + +>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) func (s *selectResultRuntimeStats) String() string { buf := bytes.NewBuffer(nil) if s.RuntimeStats != nil { @@ -383,6 +427,20 @@ func (s *selectResultRuntimeStats) String() string { } } } +<<<<<<< HEAD +======= + copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] + if copRPC != nil && copRPC.Count > 0 { + delete(s.rpcStat.Stats, tikvrpc.CmdCop) + buf.WriteString(", rpc_num: ") + buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) + buf.WriteString(", rpc_time: ") + buf.WriteString(time.Duration(copRPC.Consume).String()) + } + buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", + strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) + buf.WriteString("}") +>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) } copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] if copRPC != nil && copRPC.Count > 0 { diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index 42e85aa568880..2bd17ee0dbfcc 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -98,6 +98,9 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { execInfo := row[5].(string) c.Assert(strings.Contains(execInfo, "time"), Equals, true) c.Assert(strings.Contains(execInfo, "loops"), Equals, true) + if strings.Contains(row[0].(string), "Reader") || strings.Contains(row[0].(string), "IndexLookUp") { + c.Assert(strings.Contains(execInfo, "copr_cache_hit_ratio"), Equals, true) + } } } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 69e3d18a3cb11..c00b6416b7f6f 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -1056,6 +1056,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon data := make([]byte, len(cacheValue.Data)) copy(data, cacheValue.Data) resp.pbResp.Data = data + resp.detail.CoprCacheHit = true } else { // Cache not hit or cache hit but not valid: update the cache if the response can be cached. if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 { @@ -1081,6 +1082,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon type CopRuntimeStats struct { execdetails.ExecDetails RegionRequestRuntimeStats + + CoprCacheHit bool } func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { From 15e026cee17f7494321b6f1196bfa3e8577fcaaf Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Mon, 14 Sep 2020 14:51:46 +0800 Subject: [PATCH 2/2] resolve conflicts --- distsql/distsql_test.go | 26 ------------------- distsql/select_result.go | 56 ++-------------------------------------- 2 files changed, 2 insertions(+), 80 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index a949bf38f5f2d..e65d94224958f 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -156,32 +156,6 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) { c.Assert(err, IsNil) } -<<<<<<< HEAD -======= -func (s *testSuite) TestSelectResultRuntimeStats(c *C) { - basic := &execdetails.BasicRuntimeStats{} - basic.Record(time.Second, 20) - s1 := &selectResultRuntimeStats{ - copRespTime: []time.Duration{time.Second, time.Millisecond}, - procKeys: []int64{100, 200}, - backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, - totalProcessTime: time.Second, - totalWaitTime: time.Second, - rpcStat: tikv.NewRegionRequestRuntimeStats(), - } - s2 := *s1 - stmtStats := execdetails.NewRuntimeStatsColl() - stmtStats.RegisterStats(1, basic) - stmtStats.RegisterStats(1, s1) - stmtStats.RegisterStats(1, &s2) - stats := stmtStats.GetRootStats(1) - expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}" - c.Assert(stats.String(), Equals, expect) - // Test for idempotence. - c.Assert(stats.String(), Equals, expect) -} - ->>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) { request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetDAGRequest(&tipb.DAGRequest{}). diff --git a/distsql/select_result.go b/distsql/select_result.go index 401afa2219639..300996fddf517 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -344,46 +344,6 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim } } -<<<<<<< HEAD -======= -func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { - newRs := selectResultRuntimeStats{ - copRespTime: make([]time.Duration, 0, len(s.copRespTime)), - procKeys: make([]int64, 0, len(s.procKeys)), - backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), - rpcStat: tikv.NewRegionRequestRuntimeStats(), - } - newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) - newRs.procKeys = append(newRs.procKeys, s.procKeys...) - for k, v := range s.backoffSleep { - newRs.backoffSleep[k] += v - } - newRs.totalProcessTime += s.totalProcessTime - newRs.totalWaitTime += s.totalWaitTime - for k, v := range s.rpcStat.Stats { - newRs.rpcStat.Stats[k] = v - } - return &newRs -} - -func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) { - other, ok := rs.(*selectResultRuntimeStats) - if !ok { - return - } - s.copRespTime = append(s.copRespTime, other.copRespTime...) - s.procKeys = append(s.procKeys, other.procKeys...) - - for k, v := range other.backoffSleep { - s.backoffSleep[k] += v - } - s.totalProcessTime += other.totalProcessTime - s.totalWaitTime += other.totalWaitTime - s.rpcStat.Merge(other.rpcStat) - s.CoprCacheHitNum += other.CoprCacheHitNum -} - ->>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) func (s *selectResultRuntimeStats) String() string { buf := bytes.NewBuffer(nil) if s.RuntimeStats != nil { @@ -427,20 +387,6 @@ func (s *selectResultRuntimeStats) String() string { } } } -<<<<<<< HEAD -======= - copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] - if copRPC != nil && copRPC.Count > 0 { - delete(s.rpcStat.Stats, tikvrpc.CmdCop) - buf.WriteString(", rpc_num: ") - buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) - buf.WriteString(", rpc_time: ") - buf.WriteString(time.Duration(copRPC.Consume).String()) - } - buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", - strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) - buf.WriteString("}") ->>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948) } copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] if copRPC != nil && copRPC.Count > 0 { @@ -449,6 +395,8 @@ func (s *selectResultRuntimeStats) String() string { buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) buf.WriteString(", rpc_time: ") buf.WriteString(time.Duration(copRPC.Consume).String()) + buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", + strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) } buf.WriteString("}")