Skip to content

Commit

Permalink
*: add the traffic statistic (#1505)
Browse files Browse the repository at this point in the history
ref pingcap/tidb#57543

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Dec 16, 2024
1 parent 06d7f4b commit 0e4728c
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 65 deletions.
14 changes: 11 additions & 3 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ type sendReqCounterCacheValue struct {
timeCounter prometheus.Counter
}

func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool, execDetails *util.ExecDetails) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
Expand Down Expand Up @@ -641,6 +641,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

execNetworkCollector := &networkCollector{}
// update execDetails
if execDetails != nil {
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
Expand Down Expand Up @@ -671,11 +678,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
staleRead := req.GetStaleRead()
defer func() {
stmtExec := ctx.Value(util.ExecDetailsKey)
var detail *util.ExecDetails
if stmtExec != nil {
detail := stmtExec.(*util.ExecDetails)
detail = stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
}
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)
c.updateSendReqHistogramAndExecStats(req, resp, start, staleRead, detail)

if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
Expand Down
183 changes: 183 additions & 0 deletions internal/client/metrics_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"sync/atomic"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

type staleReadMetricsCollector struct {
}

func (s *staleReadMetricsCollector) onReq(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)

} else {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
}
}

func (s *staleReadMetricsCollector) onResp(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteInBytes.Add(float64(size))
} else {
metrics.StaleReadLocalInBytes.Add(float64(size))
}
}

type networkCollector struct {
staleReadMetricsCollector
}

func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) {
if req == nil {
return
}
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size = req.Scan().Size()
case tikvrpc.CmdCop:
size = req.Cop().Size()
case tikvrpc.CmdPrewrite:
size = req.Prewrite().Size()
case tikvrpc.CmdCommit:
size = req.Commit().Size()
case tikvrpc.CmdPessimisticLock:
size = req.PessimisticLock().Size()
case tikvrpc.CmdPessimisticRollback:
size = req.PessimisticRollback().Size()
case tikvrpc.CmdBatchRollback:
size = req.BatchRollback().Size()
case tikvrpc.CmdCheckSecondaryLocks:
size = req.CheckSecondaryLocks().Size()
case tikvrpc.CmdScanLock:
size = req.ScanLock().Size()
case tikvrpc.CmdResolveLock:
size = req.ResolveLock().Size()
case tikvrpc.CmdFlush:
size = req.Flush().Size()
case tikvrpc.CmdCheckTxnStatus:
size = req.CheckTxnStatus().Size()
case tikvrpc.CmdMPPTask:
size = req.DispatchMPPTask().Size()
default:
// ignore others
return
}
size += req.Context.Size()
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
total = &details.UnpackedBytesSentMPPTotal
crossZone = &details.UnpackedBytesSentMPPCrossZone
} else {
total = &details.UnpackedBytesSentKVTotal
crossZone = &details.UnpackedBytesSentKVCrossZone
}

atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onReq(float64(size), isCrossZoneTraffic)
}
}

func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response, details *util.ExecDetails) {
if resp == nil {
return
}
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size()
case tikvrpc.CmdScan:
size += resp.Resp.(*kvrpcpb.ScanResponse).Size()
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
case tikvrpc.CmdPrewrite:
size += resp.Resp.(*kvrpcpb.PrewriteResponse).Size()
case tikvrpc.CmdCommit:
size += resp.Resp.(*kvrpcpb.CommitResponse).Size()
case tikvrpc.CmdPessimisticLock:
size += resp.Resp.(*kvrpcpb.PessimisticLockResponse).Size()
case tikvrpc.CmdPessimisticRollback:
size += resp.Resp.(*kvrpcpb.PessimisticRollbackResponse).Size()
case tikvrpc.CmdBatchRollback:
size += resp.Resp.(*kvrpcpb.BatchRollbackResponse).Size()
case tikvrpc.CmdCheckSecondaryLocks:
size += resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse).Size()
case tikvrpc.CmdScanLock:
size += resp.Resp.(*kvrpcpb.ScanLockResponse).Size()
case tikvrpc.CmdResolveLock:
size += resp.Resp.(*kvrpcpb.ResolveLockResponse).Size()
case tikvrpc.CmdFlush:
size += resp.Resp.(*kvrpcpb.FlushResponse).Size()
case tikvrpc.CmdCheckTxnStatus:
size += resp.Resp.(*kvrpcpb.CheckTxnStatusResponse).Size()
case tikvrpc.CmdMPPTask:
// if is MPPDataPacket
if resp1, ok := resp.Resp.(*mpp.MPPDataPacket); ok && resp1 != nil {
size += resp1.Size()
}
// if is DispatchTaskResponse
if resp1, ok := resp.Resp.(*mpp.DispatchTaskResponse); ok && resp1 != nil {
size += resp1.Size()
}
default:
// ignore others
return
}
var total, crossZone *int64
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
if isTiflashTarget {
total = &details.UnpackedBytesReceivedMPPTotal
crossZone = &details.UnpackedBytesReceivedMPPCrossZone
} else {
total = &details.UnpackedBytesReceivedKVTotal
crossZone = &details.UnpackedBytesReceivedKVCrossZone
}

atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onResp(float64(size), isCrossZoneTraffic)
}
}
162 changes: 162 additions & 0 deletions internal/client/metrics_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"sync/atomic"
"testing"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

func TestNetworkCollectorOnReq(t *testing.T) {
// Initialize the collector and dependencies
collector := &networkCollector{}

// Create a mock request
// Construct requests
reqs := []*tikvrpc.Request{
tikvrpc.NewRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
kvrpcpb.Context{
BusyThresholdMs: 50,
},
),
tikvrpc.NewReplicaReadRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
kv.ReplicaReadFollower,
nil,
kvrpcpb.Context{
StaleRead: true,
},
),
}

testCases := []struct {
expectUnpackedBytesSentKV int64
expectUnpackedBytesSentKVCrossZone int64
req *tikvrpc.Request
}{
{
expectUnpackedBytesSentKV: 8,
expectUnpackedBytesSentKVCrossZone: 0,
req: reqs[0],
},
{
expectUnpackedBytesSentKV: 18,
expectUnpackedBytesSentKVCrossZone: 0,
req: reqs[1],
},
}

details := &util.ExecDetails{}

for _, cas := range testCases {
// Call the method
cas.req.AccessLocation = kv.AccessLocalZone
collector.onReq(cas.req, details)

// Verify metrics
assert.Equal(t, cas.expectUnpackedBytesSentKV, atomic.LoadInt64(&details.UnpackedBytesSentKVTotal), "Total bytes mismatch")
assert.Equal(t, cas.expectUnpackedBytesSentKVCrossZone, atomic.LoadInt64(&details.UnpackedBytesSentKVCrossZone), "Cross-zone bytes mismatch")
beforeMetric := dto.Metric{}
// Verify stale-read metrics
if cas.req.StaleRead {
assert.NoError(t, metrics.StaleReadLocalOutBytes.Write(&beforeMetric))
assert.Equal(t, float64(10), beforeMetric.GetCounter().GetValue(), "Stale-read local bytes mismatch")
assert.NoError(t, metrics.StaleReadReqLocalCounter.Write(&beforeMetric))
assert.Equal(t, float64(1), beforeMetric.GetCounter().GetValue(), "Stale-read local counter mismatch")
}
}
}

func TestNetworkCollectorOnResp(t *testing.T) {
// Initialize the collector and dependencies
collector := &networkCollector{}

// Construct requests and responses
reqs := []*tikvrpc.Request{
tikvrpc.NewRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
kvrpcpb.Context{},
),
tikvrpc.NewReplicaReadRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
kv.ReplicaReadFollower,
nil,
kvrpcpb.Context{
StaleRead: true,
},
),
}

resps := []*tikvrpc.Response{
{
Resp: &kvrpcpb.GetResponse{Value: []byte("value")},
},
{
Resp: &kvrpcpb.GetResponse{Value: []byte("stale-value")},
},
}

testCases := []struct {
expectUnpackedBytesReceivedKV int64
expectUnpackedBytesReceivedKVCrossZone int64
req *tikvrpc.Request
resp *tikvrpc.Response
}{
{
expectUnpackedBytesReceivedKV: 7,
expectUnpackedBytesReceivedKVCrossZone: 0,
req: reqs[0],
resp: resps[0],
},
{
expectUnpackedBytesReceivedKV: 20,
expectUnpackedBytesReceivedKVCrossZone: 0,
req: reqs[1],
resp: resps[1],
},
}

details := &util.ExecDetails{}

for _, cas := range testCases {
// Call the method
cas.req.AccessLocation = kv.AccessLocalZone
collector.onResp(cas.req, cas.resp, details)

// Verify metrics
assert.Equal(t, cas.expectUnpackedBytesReceivedKV, atomic.LoadInt64(&details.UnpackedBytesReceivedKVTotal), "Total bytes mismatch")
assert.Equal(t, cas.expectUnpackedBytesReceivedKVCrossZone, atomic.LoadInt64(&details.UnpackedBytesReceivedKVCrossZone), "Cross-zone bytes mismatch")

// Verify stale-read metrics if applicable
if cas.req.StaleRead {
metric := dto.Metric{}
assert.NoError(t, metrics.StaleReadLocalInBytes.Write(&metric))
assert.Equal(t, float64(13), metric.GetCounter().GetValue(), "Stale-read local bytes mismatch") // Stale value size
}
}
}
Loading

0 comments on commit 0e4728c

Please sign in to comment.