Skip to content

Commit

Permalink
mcs, tso: add more metrics on the TSO critical path (#7040)
Browse files Browse the repository at this point in the history
ref #7011

Add more metrics on the TSO critical path to monitor some key duration costs.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
JmPotato authored Sep 5, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 1d8f89b commit 7d50755
Showing 6 changed files with 84 additions and 60 deletions.
17 changes: 12 additions & 5 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"context"
"io"
"net/http"
"strconv"
"strings"
"time"

@@ -135,20 +136,26 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if request.GetHeader().GetClusterId() != s.clusterID {
header := request.GetHeader()
clusterID := header.GetClusterId()
if clusterID != s.clusterID {
return status.Errorf(
codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
s.clusterID, request.GetHeader().GetClusterId())
s.clusterID, clusterID)
}
keyspaceID := header.GetKeyspaceId()
keyspaceGroupID := header.GetKeyspaceGroupId()
dcLocation := request.GetDcLocation()
count := request.GetCount()
ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest(
ctx,
request.Header.KeyspaceId, request.Header.KeyspaceGroupId,
request.GetDcLocation(), count)
keyspaceID, keyspaceGroupID,
dcLocation, count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
tsoHandleDuration.Observe(time.Since(start).Seconds())
keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10)
tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds())
response := &tsopb.TsoResponse{
Header: s.header(keyspaceGroupBelongTo),
Timestamp: &ts,
33 changes: 5 additions & 28 deletions pkg/mcs/tso/server/metrics.go
Original file line number Diff line number Diff line change
@@ -16,12 +16,9 @@ package server

import "github.com/prometheus/client_golang/prometheus"

const (
namespace = "tso"
)
const namespace = "tso"

var (
// TODO: pre-allocate gauge metrics
timeJumpBackCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
@@ -30,7 +27,7 @@ var (
Help: "Counter of system time jumps backward.",
})

metadataGauge = prometheus.NewGaugeVec(
metaDataGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "cluster",
@@ -46,39 +43,19 @@ var (
Help: "Indicate the tso server info, and the value is the start timestamp (s).",
}, []string{"version", "hash"})

tsoProxyHandleDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "server",
Name: "handle_tso_proxy_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled tso proxy requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

tsoProxyBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "server",
Name: "handle_tso_proxy_batch_size",
Help: "Bucketed histogram of the batch size of handled tso proxy requests.",
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
})

tsoHandleDuration = prometheus.NewHistogram(
tsoHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "server",
Name: "handle_tso_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled tso requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})
}, []string{"group"})
)

func init() {
prometheus.MustRegister(timeJumpBackCounter)
prometheus.MustRegister(metadataGauge)
prometheus.MustRegister(metaDataGauge)
prometheus.MustRegister(serverInfo)
prometheus.MustRegister(tsoProxyHandleDuration)
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoHandleDuration)
}
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
@@ -345,7 +345,7 @@ func (s *Server) startServer() (err error) {
log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID))

// It may lose accuracy if use float64 to store uint64. So we store the cluster id in label.
metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0)
metaDataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0)
// The independent TSO service still reuses PD version info since PD and TSO are just
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))
17 changes: 12 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
@@ -190,7 +190,7 @@ func (s *state) markGroupRequested(groupID uint32, checker func() error) error {
return nil
}

func (s *state) checkTSOSplit(
func (s *state) checkGroupSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
@@ -212,7 +212,7 @@ func (s *state) checkTSOSplit(

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
func (s *state) checkGroupMerge(
groupID uint32,
) error {
s.RLock()
@@ -1066,7 +1066,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkGroupMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
@@ -1156,7 +1156,7 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
keyspaceGroupID uint32,
dcLocation string,
) error {
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
splitTargetAM, splitSourceAM, err := kgm.state.checkGroupSplit(keyspaceGroupID)
if err != nil || splitTargetAM == nil {
return err
}
@@ -1209,6 +1209,7 @@ const keyspaceGroupsAPIPrefix = "/pd/api/v2/tso/keyspace-groups"

// Put the code below into the critical section to prevent from sending too many HTTP requests.
func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
start := time.Now()
kgm.Lock()
defer kgm.Unlock()
// Check if the keyspace group is in split state.
@@ -1220,6 +1221,7 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
if kgm.httpClient == nil {
return nil
}
startRequest := time.Now()
statusCode, err := apiutil.DoDelete(
kgm.httpClient,
kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/split", id))
@@ -1232,17 +1234,20 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
kgm.metrics.finishSplitSendDuration.Observe(time.Since(startRequest).Seconds())
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newSplitGroup := *splitGroup
newSplitGroup.SplitState = nil
kgm.kgs[id] = &newSplitGroup
kgm.metrics.finishSplitDuration.Observe(time.Since(start).Seconds())
return nil
}

func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
start := time.Now()
kgm.Lock()
defer kgm.Unlock()
// Check if the keyspace group is in the merging state.
@@ -1254,6 +1259,7 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
if kgm.httpClient == nil {
return nil
}
startRequest := time.Now()
statusCode, err := apiutil.DoDelete(
kgm.httpClient,
kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id))
@@ -1266,14 +1272,15 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}

kgm.metrics.finishMergeSendDuration.Observe(time.Since(startRequest).Seconds())
// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newTargetGroup := *mergeTarget
newTargetGroup.MergeState = nil
kgm.kgs[id] = &newTargetGroup
kgm.metrics.finishMergeDuration.Observe(time.Since(start).Seconds())
return nil
}

69 changes: 48 additions & 21 deletions pkg/tso/metrics.go
Original file line number Diff line number Diff line change
@@ -17,40 +17,51 @@ package tso
import "github.com/prometheus/client_golang/prometheus"

const (
dcLabel = "dc"
typeLabel = "type"
groupLabel = "group"
pdNamespace = "pd"
tsoNamespace = "tso"
dcLabel = "dc"
typeLabel = "type"
groupLabel = "group"
)

var (
// TSO metrics
tsoCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Namespace: pdNamespace,
Subsystem: "tso",
Name: "events",
Help: "Counter of tso events",
}, []string{typeLabel, groupLabel, dcLabel})

tsoGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Namespace: pdNamespace,
Subsystem: "cluster",
Name: "tso",
Help: "Record of tso metadata.",
}, []string{typeLabel, groupLabel, dcLabel})

tsoGap = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Namespace: pdNamespace,
Subsystem: "cluster",
Name: "tso_gap_millionseconds",
Help: "The minimal (non-zero) TSO gap for each DC.",
}, []string{groupLabel, dcLabel})

tsoOpDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: pdNamespace,
Subsystem: "cluster",
Name: "tso_operation_duration_seconds",
Help: "Bucketed histogram of processing time(s) of the TSO operations.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{typeLabel, groupLabel, dcLabel})

tsoAllocatorRole = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Namespace: pdNamespace,
Subsystem: "tso",
Name: "role",
Help: "Indicate the PD server role info, whether it's a TSO allocator.",
@@ -59,15 +70,15 @@ var (
// Keyspace Group metrics
keyspaceGroupStateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Namespace: tsoNamespace,
Subsystem: "keyspace_group",
Name: "state",
Help: "Gauge of the Keyspace Group states.",
}, []string{typeLabel})

keyspaceGroupOpDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Namespace: tsoNamespace,
Subsystem: "keyspace_group",
Name: "operation_duration_seconds",
Help: "Bucketed histogram of processing time(s) of the Keyspace Group operations.",
@@ -79,6 +90,7 @@ func init() {
prometheus.MustRegister(tsoCounter)
prometheus.MustRegister(tsoGauge)
prometheus.MustRegister(tsoGap)
prometheus.MustRegister(tsoOpDuration)
prometheus.MustRegister(tsoAllocatorRole)
prometheus.MustRegister(keyspaceGroupStateGauge)
prometheus.MustRegister(keyspaceGroupOpDuration)
@@ -104,6 +116,10 @@ type tsoMetrics struct {
notLeaderAnymoreEvent prometheus.Counter
logicalOverflowEvent prometheus.Counter
exceededMaxRetryEvent prometheus.Counter
// timestampOracle operation duration
syncSaveDuration prometheus.Observer
resetSaveDuration prometheus.Observer
updateSaveDuration prometheus.Observer
// allocator event counter
notLeaderEvent prometheus.Counter
globalTSOSyncEvent prometheus.Counter
@@ -137,6 +153,9 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics {
notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation),
logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation),
exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation),
syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID, dcLocation),
resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID, dcLocation),
updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID, dcLocation),
notLeaderEvent: tsoCounter.WithLabelValues("not_leader", groupID, dcLocation),
globalTSOSyncEvent: tsoCounter.WithLabelValues("global_tso_sync", groupID, dcLocation),
globalTSOEstimateEvent: tsoCounter.WithLabelValues("global_tso_estimate", groupID, dcLocation),
@@ -150,21 +169,29 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics {
}

type keyspaceGroupMetrics struct {
splitSourceGauge prometheus.Gauge
splitTargetGauge prometheus.Gauge
mergeSourceGauge prometheus.Gauge
mergeTargetGauge prometheus.Gauge
splitDuration prometheus.Observer
mergeDuration prometheus.Observer
splitSourceGauge prometheus.Gauge
splitTargetGauge prometheus.Gauge
mergeSourceGauge prometheus.Gauge
mergeTargetGauge prometheus.Gauge
splitDuration prometheus.Observer
mergeDuration prometheus.Observer
finishSplitSendDuration prometheus.Observer
finishSplitDuration prometheus.Observer
finishMergeSendDuration prometheus.Observer
finishMergeDuration prometheus.Observer
}

func newKeyspaceGroupMetrics() *keyspaceGroupMetrics {
return &keyspaceGroupMetrics{
splitSourceGauge: keyspaceGroupStateGauge.WithLabelValues("split-source"),
splitTargetGauge: keyspaceGroupStateGauge.WithLabelValues("split-target"),
mergeSourceGauge: keyspaceGroupStateGauge.WithLabelValues("merge-source"),
mergeTargetGauge: keyspaceGroupStateGauge.WithLabelValues("merge-target"),
splitDuration: keyspaceGroupOpDuration.WithLabelValues("split"),
mergeDuration: keyspaceGroupOpDuration.WithLabelValues("merge"),
splitSourceGauge: keyspaceGroupStateGauge.WithLabelValues("split-source"),
splitTargetGauge: keyspaceGroupStateGauge.WithLabelValues("split-target"),
mergeSourceGauge: keyspaceGroupStateGauge.WithLabelValues("merge-source"),
mergeTargetGauge: keyspaceGroupStateGauge.WithLabelValues("merge-target"),
splitDuration: keyspaceGroupOpDuration.WithLabelValues("split"),
mergeDuration: keyspaceGroupOpDuration.WithLabelValues("merge"),
finishSplitSendDuration: keyspaceGroupOpDuration.WithLabelValues("finish-split-send"),
finishSplitDuration: keyspaceGroupOpDuration.WithLabelValues("finish-split"),
finishMergeSendDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge-send"),
finishMergeDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge"),
}
}
6 changes: 6 additions & 0 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
@@ -192,10 +192,12 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
failpoint.Return(errs.ErrEtcdTxnInternal)
})
save := next.Add(t.saveInterval)
start := time.Now()
if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
t.metrics.errSaveSyncTSEvent.Inc()
return err
}
t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)

t.metrics.syncOKEvent.Inc()
@@ -260,10 +262,12 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
// save into etcd only if nextPhysical is close to lastSavedTime
if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), nextPhysical) <= UpdateTimestampGuard {
save := nextPhysical.Add(t.saveInterval)
start := time.Now()
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
t.metrics.errSaveResetTSEvent.Inc()
return err
}
t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)
}
// save into memory only if nextPhysical or nextLogical is greater.
@@ -336,6 +340,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard {
save := next.Add(t.saveInterval)
start := time.Now()
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
log.Warn("save timestamp failed",
zap.String("dc-location", t.dcLocation),
@@ -344,6 +349,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
t.metrics.errSaveUpdateTSEvent.Inc()
return err
}
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)
}
// save into memory

0 comments on commit 7d50755

Please sign in to comment.