Skip to content

Commit

Permalink
*: refine commit txn slow log (pingcap#8731) (pingcap#8874)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Dec 29, 2018
1 parent 69fa404 commit e689572
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 13 deletions.
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (r *selectResult) getSelectResp() error {
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails())
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
if len(r.selectResp.Chunks) == 0 {
continue
}
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnRegionsNumHistogram)
prometheus.MustRegister(TiKVTxnWriteKVCountHistogram)
prometheus.MustRegister(TiKVTxnWriteSizeHistogram)
prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram)
prometheus.MustRegister(TimeJumpBackCounter)
prometheus.MustRegister(TransactionCounter)
prometheus.MustRegister(TransactionDuration)
Expand Down
9 changes: 9 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,13 @@ var (
Name: "region_cache_operations_total",
Help: "Counter of region cache.",
}, []string{LblType, LblResult})

TiKVLocalLatchWaitTimeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "local_latch_wait_seconds",
Help: "Wait time of a get local latch.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
)
8 changes: 7 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -378,11 +379,16 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {

func (s *session) CommitTxn(ctx context.Context) error {
stmt := executor.ExecStmt{
Text: "commit",
Text: "commitTxn",
Ctx: s,
StartTime: time.Now(),
}
var commitDetail *execdetails.CommitDetails
ctx = context.WithValue(ctx, execdetails.CommitDetailCtxKey, &commitDetail)
err := s.doCommitWithRetry(ctx)
if commitDetail != nil {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}
stmt.LogSlowQuery(s.sessionVars.TxnCtx.StartTS, err == nil)
label := metrics.LblOK
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,17 @@ func (sc *StatementContext) ResetForRetry() {

// MergeExecDetails merges a single region execution details into self, used to print
// the information in slow query log.
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) {
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, commitDetails *execdetails.CommitDetails) {
sc.mu.Lock()
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
if details != nil {
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
}
sc.mu.execDetails.CommitDetail = commitDetails
sc.mu.Unlock()
}

Expand Down
25 changes: 21 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -76,6 +77,7 @@ type twoPhaseCommitter struct {
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
Expand Down Expand Up @@ -140,8 +142,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
}

metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(len(keys)))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(size))
commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
Expand All @@ -152,6 +155,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
priority: getTxnPriority(txn),
syncLog: getTxnSyncLog(txn),
connID: connID,
detail: commitDetail,
}, nil
}

Expand Down Expand Up @@ -203,6 +207,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
}
// Make sure the group that contains primary key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
Expand Down Expand Up @@ -369,10 +374,12 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
Expand Down Expand Up @@ -583,7 +590,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}()

binlogChan := c.prewriteBinlog()
err := c.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars), c.keys)
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
c.detail.PrewriteTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
Expand All @@ -595,11 +606,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Trace(err)
}

start = time.Now()
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)

// check commitTS
if commitTS <= c.startTS {
Expand All @@ -618,7 +631,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Annotate(err, txnRetryableMark)
}

err = c.commitKeys(NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars), c.keys)
start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
c.detail.CommitTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
Expand Down
17 changes: 17 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/execdetails"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -187,6 +188,17 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if err != nil || committer == nil {
return errors.Trace(err)
}
defer func() {
ctxValue := ctx.Value(execdetails.CommitDetailCtxKey)
if ctxValue != nil {
commitDetail := ctxValue.(**execdetails.CommitDetails)
if *commitDetail != nil {
(*commitDetail).TxnRetry += 1
} else {
*commitDetail = committer.detail
}
}
}()
// latches disabled
if txn.store.txnLatches == nil {
err = committer.executeAndWriteFinishBinlog(ctx)
Expand All @@ -207,7 +219,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
}

// for transactions which need to acquire latches
start = time.Now()
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
committer.detail.LocalLatchTime = time.Since(start)
if committer.detail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds())
}
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
Expand Down
53 changes: 53 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"time"
)

// CommitDetailCtxKey presents CommitDetail info key in context.
const CommitDetailCtxKey = "commitDetail"

// ExecDetails contains execution detail information.
type ExecDetails struct {
ProcessTime time.Duration
Expand All @@ -29,6 +32,21 @@ type ExecDetails struct {
RequestCount int
TotalKeys int64
ProcessedKeys int64
CommitDetail *CommitDetails
}

// CommitDetails contains commit detail information.
type CommitDetails struct {
GetCommitTsTime time.Duration
PrewriteTime time.Duration
CommitTime time.Duration
LocalLatchTime time.Duration
TotalBackoffTime time.Duration
ResolveLockTime int64
WriteKeys int
WriteSize int
PrewriteRegionNum int32
TxnRetry int
}

// String implements the fmt.Stringer interface.
Expand All @@ -52,6 +70,41 @@ func (d ExecDetails) String() string {
if d.ProcessedKeys > 0 {
parts = append(parts, fmt.Sprintf("processed_keys:%d", d.ProcessedKeys))
}
commitDetails := d.CommitDetail
if commitDetails != nil {
if commitDetails.PrewriteTime > 0 {
parts = append(parts, fmt.Sprintf("prewrite_time:%v", commitDetails.PrewriteTime))
}
if commitDetails.CommitTime > 0 {
parts = append(parts, fmt.Sprintf("commit_time:%v", commitDetails.CommitTime))
}
if commitDetails.GetCommitTsTime > 0 {
parts = append(parts, fmt.Sprintf("get_commit_ts_time:%v", commitDetails.GetCommitTsTime))
}
if commitDetails.TotalBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("total_backoff_time:%v", commitDetails.TotalBackoffTime))
}
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
parts = append(parts, fmt.Sprintf("resolve_lock_time:%d", time.Duration(resolveLockTime)))
}
if commitDetails.LocalLatchTime > 0 {
parts = append(parts, fmt.Sprintf("local_latch_wait_time:%v", commitDetails.LocalLatchTime))
}
if commitDetails.WriteKeys > 0 {
parts = append(parts, fmt.Sprintf("write_keys:%d", commitDetails.WriteKeys))
}
if commitDetails.WriteSize > 0 {
parts = append(parts, fmt.Sprintf("write_size:%d", commitDetails.WriteSize))
}
prewriteRegionNum := atomic.LoadInt32(&commitDetails.PrewriteRegionNum)
if prewriteRegionNum > 0 {
parts = append(parts, fmt.Sprintf("prewrite_region:%d", prewriteRegionNum))
}
if commitDetails.TxnRetry > 0 {
parts = append(parts, fmt.Sprintf("txn_retry:%d", commitDetails.TxnRetry))
}
}
return strings.Join(parts, " ")
}

Expand Down

0 comments on commit e689572

Please sign in to comment.