Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix stale read ops metric #878

Merged
merged 8 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type replicaSelector struct {
region *Region
regionStore *regionStore
replicas []*replica
option storeSelectorOp
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
Expand Down Expand Up @@ -643,6 +644,10 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
attempts: 0,
})
}
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
var state selectorState
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
Expand All @@ -651,10 +656,6 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
}
} else {
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
state = &accessFollower{
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
isStaleRead: req.StaleRead,
Expand All @@ -669,6 +670,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
cachedRegion,
regionStore,
replicas,
option,
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
state,
-1,
-1,
Expand Down Expand Up @@ -999,9 +1001,14 @@ func (s *RegionRequestSender) SendReqCtx(

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
staleReadCollector = &staleReadMetricsCollector{}
defer func() {
if tryTimes == 0 {
metrics.StaleReadHitCounter.Add(1)
} else {
metrics.StaleReadMissCounter.Add(1)
}
}()
}

for {
Expand All @@ -1010,9 +1017,6 @@ func (s *RegionRequestSender) SendReqCtx(
if tryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes))
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1040,6 +1044,14 @@ func (s *RegionRequestSender) SendReqCtx(
return resp, nil, err
}

var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if target := s.replicaSelector.targetReplica(); target != nil {
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.option.labels)
staleReadCollector.onReq(req, isLocalTraffic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the statistic code after s.sendReqToRegion finishes with no rpc error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since function staleReadCollector .onResp will be called after rpc finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And always need to record request qps and traffic no matter request success.

}
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

Expand Down Expand Up @@ -1091,7 +1103,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
}
return resp, rpcCtx, nil
}
Expand Down Expand Up @@ -1666,35 +1678,36 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
size = req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
size = req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
size += +req.Context.Size()
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if isLocalTraffic {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
} else {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
}
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
size := 0
switch s.tp {
switch tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand All @@ -1704,19 +1717,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
// ignore non-read requests
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
if isLocalTraffic {
metrics.StaleReadLocalInBytes.Add(float64(size))
} else {
metrics.StaleReadRemoteInBytes.Add(float64(size))
}
}
30 changes: 25 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
)

// Label constants.
Expand Down Expand Up @@ -591,12 +593,28 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
TiKVStaleReadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_counter",
Help: "Counter of stale read hit/miss",
}, []string{LblResult})

TiKVStaleReadReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_req_counter",
Help: "Counter of stale read requests",
}, []string{LblType})

TiKVStaleReadBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
Help: "Counter of stale read requests bytes",
}, []string{LblResult, LblDirection})

initShortcuts()
Expand Down Expand Up @@ -669,7 +687,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
prometheus.MustRegister(TiKVStaleReadCounter)
prometheus.MustRegister(TiKVStaleReadReqCounter)
prometheus.MustRegister(TiKVStaleReadBytes)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
28 changes: 20 additions & 8 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,16 @@ var (
PrewriteAssertionUsageCounterNotExist prometheus.Counter
PrewriteAssertionUsageCounterUnknown prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
StaleReadHitCounter prometheus.Counter
StaleReadMissCounter prometheus.Counter

StaleReadReqLocalCounter prometheus.Counter
StaleReadReqCrossZoneCounter prometheus.Counter

StaleReadLocalInBytes prometheus.Counter
StaleReadLocalOutBytes prometheus.Counter
StaleReadRemoteInBytes prometheus.Counter
StaleReadRemoteOutBytes prometheus.Counter
)

func initShortcuts() {
Expand Down Expand Up @@ -241,8 +247,14 @@ func initShortcuts() {
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit")
StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss")
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local")
StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone")

StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in")
StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out")
StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
}