Skip to content

Commit

Permalink
store/tikv: move metrics shortcuts to /metrics (#22693)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Feb 4, 2021
1 parent 6d53dda commit 4fe72e5
Show file tree
Hide file tree
Showing 15 changed files with 251 additions and 171 deletions.
31 changes: 8 additions & 23 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,11 @@ type twoPhaseCommitAction interface {
String() string
}

var (
tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")
tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok")
tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err")
tikvAsyncCommitTxnCounterOk = metrics.TiKVAsyncCommitTxnCounter.WithLabelValues("ok")
tikvAsyncCommitTxnCounterError = metrics.TiKVAsyncCommitTxnCounter.WithLabelValues("err")
tikvOnePCTxnCounterOk = metrics.TiKVOnePCTxnCounter.WithLabelValues("ok")
tikvOnePCTxnCounterError = metrics.TiKVOnePCTxnCounter.WithLabelValues("err")
)

// Global variable set by config file.
var (
ManagedLockTTL uint64 = 20000 // 20s
)

// metricsTag returns detail tag for metrics.
func metricsTag(action string) string {
return "2pc_" + action
}

// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *KVStore
Expand Down Expand Up @@ -766,7 +751,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
zap.Uint64("session", c.sessionID),
zap.Stringer("action type", action),
zap.Error(e))
tikvSecondaryLockCleanupFailureCounterCommit.Inc()
metrics.SecondaryLockCleanupFailureCounterCommit.Inc()
}
}()
} else {
Expand Down Expand Up @@ -907,13 +892,13 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
if err != nil {
tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.Logger(bo.ctx).Warn("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
}
tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}
Expand Down Expand Up @@ -1046,7 +1031,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
cleanupKeysCtx := context.WithValue(context.Background(), TxnStartKey, ctx.Value(TxnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
if err != nil {
tikvSecondaryLockCleanupFailureCounterRollback.Inc()
metrics.SecondaryLockCleanupFailureCounterRollback.Inc()
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
Expand All @@ -1065,19 +1050,19 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.isOnePC() {
// The error means the 1PC transaction failed.
if err != nil {
tikvOnePCTxnCounterError.Inc()
metrics.OnePCTxnCounterError.Inc()
} else {
tikvOnePCTxnCounterOk.Inc()
metrics.OnePCTxnCounterOk.Inc()
}
} else if c.isAsyncCommit() {
// The error means the async commit should not succeed.
if err != nil {
if c.getUndeterminedErr() == nil {
c.cleanup(ctx)
}
tikvAsyncCommitTxnCounterError.Inc()
metrics.AsyncCommitTxnCounterError.Inc()
} else {
tikvAsyncCommitTxnCounterOk.Inc()
metrics.AsyncCommitTxnCounterOk.Inc()
}
} else {
// Always clean up all written keys if the txn does not commit.
Expand Down
27 changes: 8 additions & 19 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,25 @@ const (
DecorrJitter
)

var (
tikvBackoffHistogramRPC = metrics.TiKVBackoffHistogram.WithLabelValues("tikvRPC")
tikvBackoffHistogramLock = metrics.TiKVBackoffHistogram.WithLabelValues("txnLock")
tikvBackoffHistogramLockFast = metrics.TiKVBackoffHistogram.WithLabelValues("tikvLockFast")
tikvBackoffHistogramPD = metrics.TiKVBackoffHistogram.WithLabelValues("pdRPC")
tikvBackoffHistogramRegionMiss = metrics.TiKVBackoffHistogram.WithLabelValues("regionMiss")
tikvBackoffHistogramServerBusy = metrics.TiKVBackoffHistogram.WithLabelValues("serverBusy")
tikvBackoffHistogramStaleCmd = metrics.TiKVBackoffHistogram.WithLabelValues("staleCommand")
tikvBackoffHistogramEmpty = metrics.TiKVBackoffHistogram.WithLabelValues("")
)

func (t BackoffType) metric() prometheus.Observer {
switch t {
// TODO: distinguish tikv and tiflash in metrics
case BoTiKVRPC, BoTiFlashRPC:
return tikvBackoffHistogramRPC
return metrics.BackoffHistogramRPC
case BoTxnLock:
return tikvBackoffHistogramLock
return metrics.BackoffHistogramLock
case BoTxnLockFast:
return tikvBackoffHistogramLockFast
return metrics.BackoffHistogramLockFast
case BoPDRPC:
return tikvBackoffHistogramPD
return metrics.BackoffHistogramPD
case BoRegionMiss:
return tikvBackoffHistogramRegionMiss
return metrics.BackoffHistogramRegionMiss
case boTiKVServerBusy, boTiFlashServerBusy:
return tikvBackoffHistogramServerBusy
return metrics.BackoffHistogramServerBusy
case boStaleCmd:
return tikvBackoffHistogramStaleCmd
return metrics.BackoffHistogramStaleCmd
}
return tikvBackoffHistogramEmpty
return metrics.BackoffHistogramEmpty
}

// NewBackoffFn creates a backoff func which implements exponential backoff with
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -160,7 +161,7 @@ func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, st
zap.Int("range len", rangesLen),
zap.Int("task len", len(batchTasks)))
}
tikvTxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks)))
metrics.TxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks)))
return batchTasks, nil
}
}
Expand Down
3 changes: 1 addition & 2 deletions store/tikv/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
type actionCleanup struct{}

var _ twoPhaseCommitAction = actionCleanup{}
var tiKVTxnRegionsNumHistogramCleanup = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag("cleanup"))

func (actionCleanup) String() string {
return "cleanup"
}

func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return tiKVTxnRegionsNumHistogramCleanup
return metrics.TxnRegionsNumHistogramCleanup
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
Expand Down
5 changes: 1 addition & 4 deletions store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ type actionCommit struct{ retry bool }

var _ twoPhaseCommitAction = actionCommit{}

var tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
var tiKVTxnRegionsNumHistogramCommit = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag("commit"))

func (actionCommit) String() string {
return "commit"
}

func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return tiKVTxnRegionsNumHistogramCommit
return metrics.TxnRegionsNumHistogramCommit
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
Expand Down
8 changes: 2 additions & 6 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ import (
"go.uber.org/zap"
)

var (
tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
tikvTxnRegionsNumHistogramWithBatchCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
coprCacheHistogramEvict = tidbmetrics.DistSQLCoprCacheHistogram.WithLabelValues("evict")
)
var coprCacheHistogramEvict = tidbmetrics.DistSQLCoprCacheHistogram.WithLabelValues("evict")

// CopClient is coprocessor client.
type CopClient struct {
Expand Down Expand Up @@ -180,7 +176,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
zap.Int("range len", rangesLen),
zap.Int("task len", len(tasks)))
}
tikvTxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
return tasks, nil
}

Expand Down
49 changes: 16 additions & 33 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,6 @@ const ResolvedCacheSize = 2048
// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`.
const bigTxnThreshold = 16

var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithResolveForWrite = metrics.TiKVLockResolverCounter.WithLabelValues("resolve_for_write")
tikvLockResolverCountWithResolveAsync = metrics.TiKVLockResolverCounter.WithLabelValues("resolve_async_commit")
tikvLockResolverCountWithWriteConflict = metrics.TiKVLockResolverCounter.WithLabelValues("write_conflict")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
tikvLockResolverCountWithQueryTxnStatusRolledBack = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_rolled_back")
tikvLockResolverCountWithQueryCheckSecondaryLocks = metrics.TiKVLockResolverCounter.WithLabelValues("query_check_secondary_locks")
tikvLockResolverCountWithResolveLocks = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_locks")
tikvLockResolverCountWithResolveLockLite = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_lock_lite")
)

// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store Storage
Expand Down Expand Up @@ -237,7 +220,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
return true, nil
}

tikvLockResolverCountWithBatchResolve.Inc()
metrics.LockResolverCountWithBatchResolve.Inc()

// The GCWorker kill all ongoing transactions, because it must make sure all
// locks have been cleaned before GC.
Expand All @@ -254,7 +237,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
tikvLockResolverCountWithExpired.Inc()
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
Expand Down Expand Up @@ -361,9 +344,9 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

if forWrite {
tikvLockResolverCountWithResolveForWrite.Inc()
metrics.LockResolverCountWithResolveForWrite.Inc()
} else {
tikvLockResolverCountWithResolve.Inc()
metrics.LockResolverCountWithResolve.Inc()
}

var pushFail bool
Expand All @@ -384,7 +367,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

if status.ttl == 0 {
tikvLockResolverCountWithExpired.Inc()
metrics.LockResolverCountWithExpired.Inc()
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
Expand All @@ -406,7 +389,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
return err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
metrics.LockResolverCountWithNotExpired.Inc()
// If the lock is valid, the txn may be a pessimistic transaction.
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
Expand All @@ -417,7 +400,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
// abort current transaction.
// This could avoids the deadlock scene of two large transaction.
if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS {
tikvLockResolverCountWithWriteConflict.Inc()
metrics.LockResolverCountWithWriteConflict.Inc()
return kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
}
} else {
Expand Down Expand Up @@ -446,7 +429,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks

if msBeforeTxnExpired.value() > 0 && len(pushed) == 0 {
// If len(pushed) > 0, the caller will not block on the locks, it push the minCommitTS instead.
tikvLockResolverCountWithWaitExpired.Inc()
metrics.LockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), pushed, nil
}
Expand Down Expand Up @@ -582,7 +565,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()
metrics.LockResolverCountWithQueryTxnStatus.Inc()

// CheckTxnStatus may meet the following cases:
// 1. LOCK
Expand Down Expand Up @@ -649,9 +632,9 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
status.ttl = cmdResp.LockTtl
} else {
if cmdResp.CommitVersion == 0 {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
metrics.LockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
metrics.LockResolverCountWithQueryTxnStatusCommitted.Inc()
}

status.commitTS = cmdResp.CommitVersion
Expand Down Expand Up @@ -744,7 +727,7 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys []
StartVersion: txnID,
}
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq)
tikvLockResolverCountWithQueryCheckSecondaryLocks.Inc()
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
resp, err := lr.store.SendReq(bo, req, curRegionID, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -785,7 +768,7 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys []

// resolveLockAsync resolves l assuming it was locked using the async commit protocol.
func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatus) error {
tikvLockResolverCountWithResolveAsync.Inc()
metrics.LockResolverCountWithResolveAsync.Inc()

resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
Expand Down Expand Up @@ -918,7 +901,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region Region
}

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[RegionVerID]struct{}) error {
tikvLockResolverCountWithResolveLocks.Inc()
metrics.LockResolverCountWithResolveLocks.Inc()
resolveLite := lite || l.TxnSize < bigTxnThreshold
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
Expand All @@ -940,7 +923,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li
if resolveLite {
// Only resolve specified keys when it is a small transaction,
// prevent from scanning the whole region in this case.
tikvLockResolverCountWithResolveLockLite.Inc()
metrics.LockResolverCountWithResolveLockLite.Inc()
lreq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
Expand Down Expand Up @@ -976,7 +959,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li
}

func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegions map[RegionVerID]struct{}) error {
tikvLockResolverCountWithResolveLocks.Inc()
metrics.LockResolverCountWithResolveLocks.Inc()
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func initMetrics(namespace, subsystem string) {
Name: "one_pc_txn_counter",
Help: "Counter of 1PC transactions.",
}, []string{LblType})

initShortcuts()
}

func init() {
Expand Down
Loading

0 comments on commit 4fe72e5

Please sign in to comment.