Skip to content

Commit

Permalink
store/tikv: remove execdetails dependency (pingcap#24119)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Apr 20, 2021
1 parent e104d81 commit fa39b79
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 168 deletions.
4 changes: 2 additions & 2 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats {
}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
newRs.SnapshotRuntimeStats = snapshotStats
}
if e.BasicRuntimeStats != nil {
basicStats := e.BasicRuntimeStats.Clone()
Expand All @@ -1194,7 +1194,7 @@ func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) {
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
e.SnapshotRuntimeStats = snapshotStats
} else {
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (e *runtimeStatsWithSnapshot) Clone() execdetails.RuntimeStats {
newRs := &runtimeStatsWithSnapshot{}
if e.SnapshotRuntimeStats != nil {
snapshotStats := e.SnapshotRuntimeStats.Clone()
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
newRs.SnapshotRuntimeStats = snapshotStats
}
return newRs
}
Expand All @@ -594,7 +594,7 @@ func (e *runtimeStatsWithSnapshot) Merge(other execdetails.RuntimeStats) {
if tmp.SnapshotRuntimeStats != nil {
if e.SnapshotRuntimeStats == nil {
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
e.SnapshotRuntimeStats = snapshotStats
return
}
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,19 +542,19 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
}

// MergeScanDetail merges scan details into self.
func (sc *StatementContext) MergeScanDetail(scanDetail *execdetails.ScanDetail) {
func (sc *StatementContext) MergeScanDetail(scanDetail *util.ScanDetail) {
// Currently TiFlash cop task does not fill scanDetail, so need to skip it if scanDetail is nil
if scanDetail == nil {
return
}
if sc.mu.execDetails.ScanDetail == nil {
sc.mu.execDetails.ScanDetail = &execdetails.ScanDetail{}
sc.mu.execDetails.ScanDetail = &util.ScanDetail{}
}
sc.mu.execDetails.ScanDetail.Merge(scanDetail)
}

// MergeTimeDetail merges time details into self.
func (sc *StatementContext) MergeTimeDetail(timeDetail execdetails.TimeDetail) {
func (sc *StatementContext) MergeTimeDetail(timeDetail util.TimeDetail) {
sc.mu.execDetails.TimeDetail.ProcessTime += timeDetail.ProcessTime
sc.mu.execDetails.TimeDetail.WaitTime += timeDetail.WaitTime
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/execdetails"
)

Expand All @@ -39,7 +40,7 @@ func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
CalleeAddress: fmt.Sprintf("%v", i+1),
BackoffSleep: make(map[string]time.Duration),
BackoffTimes: make(map[string]int),
TimeDetail: execdetails.TimeDetail{
TimeDetail: util.TimeDetail{
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
},
Expand Down
5 changes: 3 additions & 2 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -152,11 +153,11 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
execDetail := execdetails.ExecDetails{
BackoffTime: time.Millisecond,
RequestCount: 2,
ScanDetail: &execdetails.ScanDetail{
ScanDetail: &util.ScanDetail{
ProcessedKeys: 20001,
TotalKeys: 10000,
},
TimeDetail: execdetails.TimeDetail{
TimeDetail: util.TimeDetail{
ProcessTime: time.Second * time.Duration(2),
WaitTime: time.Minute,
},
Expand Down
4 changes: 2 additions & 2 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
resp.detail.CalleeAddress = rpcCtx.Addr
}
resp.respTime = costTime
sd := &execdetails.ScanDetail{}
td := execdetails.TimeDetail{}
sd := &util.ScanDetail{}
td := util.TimeDetail{}
if pbDetails := resp.pbResp.ExecDetailsV2; pbDetails != nil {
// Take values in `ExecDetailsV2` first.
if timeDetail := pbDetails.TimeDetail; timeDetail != nil {
Expand Down
34 changes: 12 additions & 22 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/execdetails"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -525,10 +524,10 @@ func (s *KVSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
return
}
if s.mu.stats.scanDetail == nil {
s.mu.stats.scanDetail = &execdetails.ScanDetail{}
s.mu.stats.scanDetail = &util.ScanDetail{}
}
if s.mu.stats.timeDetail == nil {
s.mu.stats.timeDetail = &execdetails.TimeDetail{}
s.mu.stats.timeDetail = &util.TimeDetail{}
}
s.mu.stats.scanDetail.MergeFromScanDetailV2(detail.ScanDetailV2)
s.mu.stats.timeDetail.MergeFromTimeDetail(detail.TimeDetail)
Expand Down Expand Up @@ -706,17 +705,12 @@ type SnapshotRuntimeStats struct {
rpcStats RegionRequestRuntimeStats
backoffSleepMS map[BackoffType]int
backoffTimes map[BackoffType]int
scanDetail *execdetails.ScanDetail
timeDetail *execdetails.TimeDetail
}

// Tp implements the RuntimeStats interface.
func (rs *SnapshotRuntimeStats) Tp() int {
return execdetails.TpSnapshotRuntimeStats
scanDetail *util.ScanDetail
timeDetail *util.TimeDetail
}

// Clone implements the RuntimeStats interface.
func (rs *SnapshotRuntimeStats) Clone() execdetails.RuntimeStats {
func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats {
newRs := SnapshotRuntimeStats{rpcStats: NewRegionRequestRuntimeStats()}
if rs.rpcStats.Stats != nil {
for k, v := range rs.rpcStats.Stats {
Expand All @@ -737,28 +731,24 @@ func (rs *SnapshotRuntimeStats) Clone() execdetails.RuntimeStats {
}

// Merge implements the RuntimeStats interface.
func (rs *SnapshotRuntimeStats) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*SnapshotRuntimeStats)
if !ok {
return
}
if tmp.rpcStats.Stats != nil {
func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) {
if other.rpcStats.Stats != nil {
if rs.rpcStats.Stats == nil {
rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats, len(tmp.rpcStats.Stats))
rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats, len(other.rpcStats.Stats))
}
rs.rpcStats.Merge(tmp.rpcStats)
rs.rpcStats.Merge(other.rpcStats)
}
if len(tmp.backoffSleepMS) > 0 {
if len(other.backoffSleepMS) > 0 {
if rs.backoffSleepMS == nil {
rs.backoffSleepMS = make(map[BackoffType]int)
}
if rs.backoffTimes == nil {
rs.backoffTimes = make(map[BackoffType]int)
}
for k, v := range tmp.backoffSleepMS {
for k, v := range other.backoffSleepMS {
rs.backoffSleepMS[k] += v
}
for k, v := range tmp.backoffTimes {
for k, v := range other.backoffTimes {
rs.backoffTimes[k] += v
}
}
Expand Down
122 changes: 122 additions & 0 deletions store/tikv/util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
package util

import (
"bytes"
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util/memory"
)

type commitDetailCtxKeyType struct{}
Expand Down Expand Up @@ -179,3 +185,119 @@ func getUnit(d time.Duration) time.Duration {
}
return time.Nanosecond
}

// ScanDetail contains coprocessor scan detail information.
type ScanDetail struct {
// TotalKeys is the approximate number of MVCC keys meet during scanning. It includes
// deleted versions, but does not include RocksDB tombstone keys.
TotalKeys int64
// ProcessedKeys is the number of user keys scanned from the storage.
// It does not include deleted version or RocksDB tombstone keys.
// For Coprocessor requests, it includes keys that has been filtered out by Selection.
ProcessedKeys int64
// RocksdbDeleteSkippedCount is the total number of deletes and single deletes skipped over during
// iteration, i.e. how many RocksDB tombstones are skipped.
RocksdbDeleteSkippedCount uint64
// RocksdbKeySkippedCount it the total number of internal keys skipped over during iteration.
RocksdbKeySkippedCount uint64
// RocksdbBlockCacheHitCount is the total number of RocksDB block cache hits.
RocksdbBlockCacheHitCount uint64
// RocksdbBlockReadCount is the total number of block reads (with IO).
RocksdbBlockReadCount uint64
// RocksdbBlockReadByte is the total number of bytes from block reads.
RocksdbBlockReadByte uint64
}

// Merge merges scan detail execution details into self.
func (sd *ScanDetail) Merge(scanDetail *ScanDetail) {
atomic.AddInt64(&sd.TotalKeys, scanDetail.TotalKeys)
atomic.AddInt64(&sd.ProcessedKeys, scanDetail.ProcessedKeys)
atomic.AddUint64(&sd.RocksdbDeleteSkippedCount, scanDetail.RocksdbDeleteSkippedCount)
atomic.AddUint64(&sd.RocksdbKeySkippedCount, scanDetail.RocksdbKeySkippedCount)
atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount)
atomic.AddUint64(&sd.RocksdbBlockReadCount, scanDetail.RocksdbBlockReadCount)
atomic.AddUint64(&sd.RocksdbBlockReadByte, scanDetail.RocksdbBlockReadByte)
}

var zeroScanDetail = ScanDetail{}

// String implements the fmt.Stringer interface.
func (sd *ScanDetail) String() string {
if sd == nil || *sd == zeroScanDetail {
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 16))
buf.WriteString("scan_detail: {")
buf.WriteString("total_process_keys: ")
buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10))
buf.WriteString(", total_keys: ")
buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10))
buf.WriteString(", rocksdb: {")
buf.WriteString("delete_skipped_count: ")
buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10))
buf.WriteString(", key_skipped_count: ")
buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10))
buf.WriteString(", block: {")
buf.WriteString("cache_hit_count: ")
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10))
buf.WriteString(", read_count: ")
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10))
buf.WriteString(", read_byte: ")
buf.WriteString(memory.FormatBytes(int64(sd.RocksdbBlockReadByte)))
buf.WriteString("}}}")
return buf.String()
}

// MergeFromScanDetailV2 merges scan detail from pb into itself.
func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) {
if scanDetail != nil {
sd.TotalKeys += int64(scanDetail.TotalVersions)
sd.ProcessedKeys += int64(scanDetail.ProcessedVersions)
sd.RocksdbDeleteSkippedCount += scanDetail.RocksdbDeleteSkippedCount
sd.RocksdbKeySkippedCount += scanDetail.RocksdbKeySkippedCount
sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount
sd.RocksdbBlockReadCount += scanDetail.RocksdbBlockReadCount
sd.RocksdbBlockReadByte += scanDetail.RocksdbBlockReadByte
}
}

// TimeDetail contains coprocessor time detail information.
type TimeDetail struct {
// WaitWallTimeMs is the off-cpu wall time which is elapsed in TiKV side. Usually this includes queue waiting time and
// other kind of waitings in series.
ProcessTime time.Duration
// Off-cpu and on-cpu wall time elapsed to actually process the request payload. It does not
// include `wait_wall_time`.
// This field is very close to the CPU time in most cases. Some wait time spend in RocksDB
// cannot be excluded for now, like Mutex wait time, which is included in this field, so that
// this field is called wall time instead of CPU time.
WaitTime time.Duration
}

// String implements the fmt.Stringer interface.
func (td *TimeDetail) String() string {
if td == nil {
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 16))
if td.ProcessTime > 0 {
buf.WriteString("total_process_time: ")
buf.WriteString(FormatDuration(td.ProcessTime))
}
if td.WaitTime > 0 {
if buf.Len() > 0 {
buf.WriteString(", ")
}
buf.WriteString("total_wait_time: ")
buf.WriteString(FormatDuration(td.WaitTime))
}
return buf.String()
}

// MergeFromTimeDetail merges time detail from pb into itself.
func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) {
if timeDetail != nil {
td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
}
}
Loading

0 comments on commit fa39b79

Please sign in to comment.