Skip to content

Commit

Permalink
*: fix stale read ops metric (#878)
Browse files Browse the repository at this point in the history
* add metrics

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine metrics

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine metrics

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine metrics

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* tiny fix

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine

Signed-off-by: crazycs520 <crazycs520@gmail.com>

---------

Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored Jul 13, 2023
1 parent 7d38d5d commit 1e3aeab
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 49 deletions.
78 changes: 42 additions & 36 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type replicaSelector struct {
region *Region
regionStore *regionStore
replicas []*replica
labels []*metapb.StoreLabel
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
Expand Down Expand Up @@ -643,6 +644,10 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
attempts: 0,
})
}
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
var state selectorState
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
Expand All @@ -651,10 +656,6 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
}
} else {
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
state = &accessFollower{
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
isStaleRead: req.StaleRead,
Expand All @@ -669,6 +670,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
cachedRegion,
regionStore,
replicas,
option.labels,
state,
-1,
-1,
Expand Down Expand Up @@ -999,9 +1001,14 @@ func (s *RegionRequestSender) SendReqCtx(

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
staleReadCollector = &staleReadMetricsCollector{}
defer func() {
if tryTimes == 0 {
metrics.StaleReadHitCounter.Add(1)
} else {
metrics.StaleReadMissCounter.Add(1)
}
}()
}

for {
Expand All @@ -1010,9 +1017,6 @@ func (s *RegionRequestSender) SendReqCtx(
if tryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes))
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1040,6 +1044,14 @@ func (s *RegionRequestSender) SendReqCtx(
return resp, nil, err
}

var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if target := s.replicaSelector.targetReplica(); target != nil {
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels)
staleReadCollector.onReq(req, isLocalTraffic)
}
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

Expand Down Expand Up @@ -1091,7 +1103,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
}
return resp, rpcCtx, nil
}
Expand Down Expand Up @@ -1666,35 +1678,36 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
size = req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
size = req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
if isLocalTraffic {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
} else {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
}
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
size := 0
switch s.tp {
switch tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand All @@ -1704,19 +1717,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
// ignore non-read requests
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
if isLocalTraffic {
metrics.StaleReadLocalInBytes.Add(float64(size))
} else {
metrics.StaleReadRemoteInBytes.Add(float64(size))
}
}
30 changes: 25 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
)

// Label constants.
Expand Down Expand Up @@ -591,12 +593,28 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
TiKVStaleReadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_counter",
Help: "Counter of stale read hit/miss",
}, []string{LblResult})

TiKVStaleReadReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_req_counter",
Help: "Counter of stale read requests",
}, []string{LblType})

TiKVStaleReadBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
Help: "Counter of stale read requests bytes",
}, []string{LblResult, LblDirection})

initShortcuts()
Expand Down Expand Up @@ -669,7 +687,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
prometheus.MustRegister(TiKVStaleReadCounter)
prometheus.MustRegister(TiKVStaleReadReqCounter)
prometheus.MustRegister(TiKVStaleReadBytes)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
28 changes: 20 additions & 8 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ var (
PrewriteAssertionUsageCounterNotExist prometheus.Counter
PrewriteAssertionUsageCounterUnknown prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
StaleReadHitCounter prometheus.Counter
StaleReadMissCounter prometheus.Counter

StaleReadReqLocalCounter prometheus.Counter
StaleReadReqCrossZoneCounter prometheus.Counter

StaleReadLocalInBytes prometheus.Counter
StaleReadLocalOutBytes prometheus.Counter
StaleReadRemoteInBytes prometheus.Counter
StaleReadRemoteOutBytes prometheus.Counter
)

func initShortcuts() {
Expand Down Expand Up @@ -241,8 +247,14 @@ func initShortcuts() {
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit")
StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss")

StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local")
StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone")

StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in")
StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out")
StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
}

0 comments on commit 1e3aeab

Please sign in to comment.