diff --git a/internal/client/client.go b/internal/client/client.go index fc832d368..9bd7fd515 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -570,7 +570,7 @@ type sendReqCounterCacheValue struct { timeCounter prometheus.Counter } -func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) { +func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool, execDetails *util.ExecDetails) { elapsed := time.Since(start) secs := elapsed.Seconds() storeID := req.Context.GetPeer().GetStoreId() @@ -641,6 +641,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr latHist.(prometheus.Observer).Observe(latency.Seconds()) } } + + execNetworkCollector := &networkCollector{} + // update execDetails + if execDetails != nil { + execNetworkCollector.onReq(req, execDetails) + execNetworkCollector.onResp(req, resp, execDetails) + } } func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { @@ -671,11 +678,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R staleRead := req.GetStaleRead() defer func() { stmtExec := ctx.Value(util.ExecDetailsKey) + var detail *util.ExecDetails if stmtExec != nil { - detail := stmtExec.(*util.ExecDetails) + detail = stmtExec.(*util.ExecDetails) atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start))) } - c.updateTiKVSendReqHistogram(req, resp, start, staleRead) + c.updateSendReqHistogramAndExecStats(req, resp, start, staleRead, detail) if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) { if si := buildSpanInfoFromResp(resp); si != nil { diff --git a/internal/client/metrics_collector.go b/internal/client/metrics_collector.go new file mode 100644 index 000000000..6484dc069 --- /dev/null +++ b/internal/client/metrics_collector.go @@ -0,0 +1,183 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "sync/atomic" + + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" +) + +type staleReadMetricsCollector struct { +} + +func (s *staleReadMetricsCollector) onReq(size float64, isCrossZoneTraffic bool) { + if isCrossZoneTraffic { + metrics.StaleReadRemoteOutBytes.Add(float64(size)) + metrics.StaleReadReqCrossZoneCounter.Add(1) + + } else { + metrics.StaleReadLocalOutBytes.Add(float64(size)) + metrics.StaleReadReqLocalCounter.Add(1) + } +} + +func (s *staleReadMetricsCollector) onResp(size float64, isCrossZoneTraffic bool) { + if isCrossZoneTraffic { + metrics.StaleReadRemoteInBytes.Add(float64(size)) + } else { + metrics.StaleReadLocalInBytes.Add(float64(size)) + } +} + +type networkCollector struct { + staleReadMetricsCollector +} + +func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) { + if req == nil { + return + } + size := 0 + switch req.Type { + case tikvrpc.CmdGet: + size = req.Get().Size() + case tikvrpc.CmdBatchGet: + size = req.BatchGet().Size() + case tikvrpc.CmdScan: + size = req.Scan().Size() + case tikvrpc.CmdCop: + size = req.Cop().Size() + case tikvrpc.CmdPrewrite: + size = req.Prewrite().Size() + case tikvrpc.CmdCommit: + size = req.Commit().Size() + case tikvrpc.CmdPessimisticLock: + size = req.PessimisticLock().Size() + case tikvrpc.CmdPessimisticRollback: + size = req.PessimisticRollback().Size() + case tikvrpc.CmdBatchRollback: + size = req.BatchRollback().Size() + case tikvrpc.CmdCheckSecondaryLocks: + size = req.CheckSecondaryLocks().Size() + case tikvrpc.CmdScanLock: + size = req.ScanLock().Size() + case tikvrpc.CmdResolveLock: + size = req.ResolveLock().Size() + case tikvrpc.CmdFlush: + size = req.Flush().Size() + case tikvrpc.CmdCheckTxnStatus: + size = req.CheckTxnStatus().Size() + case tikvrpc.CmdMPPTask: + size = req.DispatchMPPTask().Size() + default: + // ignore others + return + } + size += req.Context.Size() + isTiflashTarget := req.StoreTp == tikvrpc.TiFlash + var total, crossZone *int64 + if isTiflashTarget { + total = &details.UnpackedBytesSentMPPTotal + crossZone = &details.UnpackedBytesSentMPPCrossZone + } else { + total = &details.UnpackedBytesSentKVTotal + crossZone = &details.UnpackedBytesSentKVCrossZone + } + + atomic.AddInt64(total, int64(size)) + isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone + if isCrossZoneTraffic { + atomic.AddInt64(crossZone, int64(size)) + } + // stale read metrics + if req.StaleRead { + s.staleReadMetricsCollector.onReq(float64(size), isCrossZoneTraffic) + } +} + +func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response, details *util.ExecDetails) { + if resp == nil { + return + } + size := 0 + switch req.Type { + case tikvrpc.CmdGet: + size += resp.Resp.(*kvrpcpb.GetResponse).Size() + case tikvrpc.CmdBatchGet: + size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size() + case tikvrpc.CmdScan: + size += resp.Resp.(*kvrpcpb.ScanResponse).Size() + case tikvrpc.CmdCop: + size += resp.Resp.(*coprocessor.Response).Size() + case tikvrpc.CmdPrewrite: + size += resp.Resp.(*kvrpcpb.PrewriteResponse).Size() + case tikvrpc.CmdCommit: + size += resp.Resp.(*kvrpcpb.CommitResponse).Size() + case tikvrpc.CmdPessimisticLock: + size += resp.Resp.(*kvrpcpb.PessimisticLockResponse).Size() + case tikvrpc.CmdPessimisticRollback: + size += resp.Resp.(*kvrpcpb.PessimisticRollbackResponse).Size() + case tikvrpc.CmdBatchRollback: + size += resp.Resp.(*kvrpcpb.BatchRollbackResponse).Size() + case tikvrpc.CmdCheckSecondaryLocks: + size += resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse).Size() + case tikvrpc.CmdScanLock: + size += resp.Resp.(*kvrpcpb.ScanLockResponse).Size() + case tikvrpc.CmdResolveLock: + size += resp.Resp.(*kvrpcpb.ResolveLockResponse).Size() + case tikvrpc.CmdFlush: + size += resp.Resp.(*kvrpcpb.FlushResponse).Size() + case tikvrpc.CmdCheckTxnStatus: + size += resp.Resp.(*kvrpcpb.CheckTxnStatusResponse).Size() + case tikvrpc.CmdMPPTask: + // if is MPPDataPacket + if resp1, ok := resp.Resp.(*mpp.MPPDataPacket); ok && resp1 != nil { + size += resp1.Size() + } + // if is DispatchTaskResponse + if resp1, ok := resp.Resp.(*mpp.DispatchTaskResponse); ok && resp1 != nil { + size += resp1.Size() + } + default: + // ignore others + return + } + var total, crossZone *int64 + isTiflashTarget := req.StoreTp == tikvrpc.TiFlash + if isTiflashTarget { + total = &details.UnpackedBytesReceivedMPPTotal + crossZone = &details.UnpackedBytesReceivedMPPCrossZone + } else { + total = &details.UnpackedBytesReceivedKVTotal + crossZone = &details.UnpackedBytesReceivedKVCrossZone + } + + atomic.AddInt64(total, int64(size)) + isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone + if isCrossZoneTraffic { + atomic.AddInt64(crossZone, int64(size)) + } + // stale read metrics + if req.StaleRead { + s.staleReadMetricsCollector.onResp(float64(size), isCrossZoneTraffic) + } +} diff --git a/internal/client/metrics_collector_test.go b/internal/client/metrics_collector_test.go new file mode 100644 index 000000000..f5fda8035 --- /dev/null +++ b/internal/client/metrics_collector_test.go @@ -0,0 +1,162 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "sync/atomic" + "testing" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" +) + +func TestNetworkCollectorOnReq(t *testing.T) { + // Initialize the collector and dependencies + collector := &networkCollector{} + + // Create a mock request + // Construct requests + reqs := []*tikvrpc.Request{ + tikvrpc.NewRequest( + tikvrpc.CmdGet, + &kvrpcpb.GetRequest{Key: []byte("key")}, + kvrpcpb.Context{ + BusyThresholdMs: 50, + }, + ), + tikvrpc.NewReplicaReadRequest( + tikvrpc.CmdGet, + &kvrpcpb.GetRequest{Key: []byte("key")}, + kv.ReplicaReadFollower, + nil, + kvrpcpb.Context{ + StaleRead: true, + }, + ), + } + + testCases := []struct { + expectUnpackedBytesSentKV int64 + expectUnpackedBytesSentKVCrossZone int64 + req *tikvrpc.Request + }{ + { + expectUnpackedBytesSentKV: 8, + expectUnpackedBytesSentKVCrossZone: 0, + req: reqs[0], + }, + { + expectUnpackedBytesSentKV: 18, + expectUnpackedBytesSentKVCrossZone: 0, + req: reqs[1], + }, + } + + details := &util.ExecDetails{} + + for _, cas := range testCases { + // Call the method + cas.req.AccessLocation = kv.AccessLocalZone + collector.onReq(cas.req, details) + + // Verify metrics + assert.Equal(t, cas.expectUnpackedBytesSentKV, atomic.LoadInt64(&details.UnpackedBytesSentKVTotal), "Total bytes mismatch") + assert.Equal(t, cas.expectUnpackedBytesSentKVCrossZone, atomic.LoadInt64(&details.UnpackedBytesSentKVCrossZone), "Cross-zone bytes mismatch") + beforeMetric := dto.Metric{} + // Verify stale-read metrics + if cas.req.StaleRead { + assert.NoError(t, metrics.StaleReadLocalOutBytes.Write(&beforeMetric)) + assert.Equal(t, float64(10), beforeMetric.GetCounter().GetValue(), "Stale-read local bytes mismatch") + assert.NoError(t, metrics.StaleReadReqLocalCounter.Write(&beforeMetric)) + assert.Equal(t, float64(1), beforeMetric.GetCounter().GetValue(), "Stale-read local counter mismatch") + } + } +} + +func TestNetworkCollectorOnResp(t *testing.T) { + // Initialize the collector and dependencies + collector := &networkCollector{} + + // Construct requests and responses + reqs := []*tikvrpc.Request{ + tikvrpc.NewRequest( + tikvrpc.CmdGet, + &kvrpcpb.GetRequest{Key: []byte("key")}, + kvrpcpb.Context{}, + ), + tikvrpc.NewReplicaReadRequest( + tikvrpc.CmdGet, + &kvrpcpb.GetRequest{Key: []byte("key")}, + kv.ReplicaReadFollower, + nil, + kvrpcpb.Context{ + StaleRead: true, + }, + ), + } + + resps := []*tikvrpc.Response{ + { + Resp: &kvrpcpb.GetResponse{Value: []byte("value")}, + }, + { + Resp: &kvrpcpb.GetResponse{Value: []byte("stale-value")}, + }, + } + + testCases := []struct { + expectUnpackedBytesReceivedKV int64 + expectUnpackedBytesReceivedKVCrossZone int64 + req *tikvrpc.Request + resp *tikvrpc.Response + }{ + { + expectUnpackedBytesReceivedKV: 7, + expectUnpackedBytesReceivedKVCrossZone: 0, + req: reqs[0], + resp: resps[0], + }, + { + expectUnpackedBytesReceivedKV: 20, + expectUnpackedBytesReceivedKVCrossZone: 0, + req: reqs[1], + resp: resps[1], + }, + } + + details := &util.ExecDetails{} + + for _, cas := range testCases { + // Call the method + cas.req.AccessLocation = kv.AccessLocalZone + collector.onResp(cas.req, cas.resp, details) + + // Verify metrics + assert.Equal(t, cas.expectUnpackedBytesReceivedKV, atomic.LoadInt64(&details.UnpackedBytesReceivedKVTotal), "Total bytes mismatch") + assert.Equal(t, cas.expectUnpackedBytesReceivedKVCrossZone, atomic.LoadInt64(&details.UnpackedBytesReceivedKVCrossZone), "Cross-zone bytes mismatch") + + // Verify stale-read metrics if applicable + if cas.req.StaleRead { + metric := dto.Metric{} + assert.NoError(t, metrics.StaleReadLocalInBytes.Write(&metric)) + assert.Equal(t, float64(13), metric.GetCounter().GetValue(), "Stale-read local bytes mismatch") // Stale value size + } + } +} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d5450b435..b3e71de10 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -790,9 +790,7 @@ func (s *RegionRequestSender) SendReqCtx( } }() - var staleReadCollector *staleReadMetricsCollector if req.StaleRead { - staleReadCollector = &staleReadMetricsCollector{} defer func() { if retryTimes == 0 { metrics.StaleReadHitCounter.Add(1) @@ -844,13 +842,23 @@ func (s *RegionRequestSender) SendReqCtx( resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, retryTimes, err } - - var isLocalTraffic bool - if staleReadCollector != nil && s.replicaSelector != nil && s.replicaSelector.target != nil { - isLocalTraffic = s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) - staleReadCollector.onReq(req, isLocalTraffic) + // patch the access location if it is not set under region request sender. which includes the coprocessor, + // txn relative tikv request. + // note: MPP not use this path. need specified in the MPP layer. + patchAccessLocation := func() { + if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) { + req.AccessLocation = kv.AccessLocalZone + } else { + req.AccessLocation = kv.AccessCrossZone + } + } + if s.replicaSelector != nil && + s.replicaSelector.target != nil && + req.AccessLocation == kv.AccessUnknown && + len(s.replicaSelector.option.labels) != 0 { + // patch the access location if it is not set under region request sender. + patchAccessLocation() } - logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) s.storeAddr = rpcCtx.Addr @@ -932,9 +940,7 @@ func (s *RegionRequestSender) SendReqCtx( s.replicaSelector.onSendSuccess(req) } } - if staleReadCollector != nil { - staleReadCollector.onResp(req.Type, resp, isLocalTraffic) - } + return resp, rpcCtx, retryTimes, nil } } @@ -1802,56 +1808,6 @@ func (s *RegionRequestSender) validateReadTS(ctx context.Context, req *tikvrpc.R return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{TxnScope: req.TxnScope}) } -type staleReadMetricsCollector struct { -} - -func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) { - size := 0 - switch req.Type { - case tikvrpc.CmdGet: - size = req.Get().Size() - case tikvrpc.CmdBatchGet: - size = req.BatchGet().Size() - case tikvrpc.CmdScan: - size = req.Scan().Size() - case tikvrpc.CmdCop: - size = req.Cop().Size() - default: - // ignore non-read requests - return - } - size += req.Context.Size() - if isLocalTraffic { - metrics.StaleReadLocalOutBytes.Add(float64(size)) - metrics.StaleReadReqLocalCounter.Add(1) - } else { - metrics.StaleReadRemoteOutBytes.Add(float64(size)) - metrics.StaleReadReqCrossZoneCounter.Add(1) - } -} - -func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) { - size := 0 - switch tp { - case tikvrpc.CmdGet: - size += resp.Resp.(*kvrpcpb.GetResponse).Size() - case tikvrpc.CmdBatchGet: - size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size() - case tikvrpc.CmdScan: - size += resp.Resp.(*kvrpcpb.ScanResponse).Size() - case tikvrpc.CmdCop: - size += resp.Resp.(*coprocessor.Response).Size() - default: - // ignore non-read requests - return - } - if isLocalTraffic { - metrics.StaleReadLocalInBytes.Add(float64(size)) - } else { - metrics.StaleReadRemoteInBytes.Add(float64(size)) - } -} - func patchRequestSource(req *tikvrpc.Request, replicaType string) { var sb strings.Builder defer func() { diff --git a/kv/store_vars.go b/kv/store_vars.go index 184975ca2..9bf8ff78a 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -92,3 +92,11 @@ func (r ReplicaReadType) String() string { return fmt.Sprintf("unknown-%v", byte(r)) } } + +type AccessLocationType byte + +const ( + AccessUnknown AccessLocationType = iota + AccessLocalZone + AccessCrossZone +) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 96a480c91..d2474bd3a 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -264,7 +264,8 @@ type Request struct { ReadType string // InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info. InputRequestSource string - + // AccessLocationAttr indicates the request is sent to a different zone. + AccessLocation kv.AccessLocationType // rev represents the revision of the request, it's increased when `Req.Context` gets patched. rev uint32 } diff --git a/util/execdetails.go b/util/execdetails.go index b49298d21..74f3d2c09 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -337,6 +337,19 @@ type ExecDetails struct { BackoffDuration int64 WaitKVRespDuration int64 WaitPDRespDuration int64 + TrafficDetails +} + +// TrafficDetails contains traffic detail info. +type TrafficDetails struct { + UnpackedBytesSentKVTotal int64 + UnpackedBytesReceivedKVTotal int64 + UnpackedBytesSentKVCrossZone int64 + UnpackedBytesReceivedKVCrossZone int64 + UnpackedBytesSentMPPTotal int64 + UnpackedBytesReceivedMPPTotal int64 + UnpackedBytesSentMPPCrossZone int64 + UnpackedBytesReceivedMPPCrossZone int64 } // FormatDuration uses to format duration, this function will prune precision before format duration.