diff --git a/distsql/select_result.go b/distsql/select_result.go index 4b77eff02a800..1eef97a380d79 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -159,7 +159,7 @@ func (r *selectResult) getSelectResp() error { } r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ - sc.MergeExecDetails(re.result.GetExecDetails()) + sc.MergeExecDetails(re.result.GetExecDetails(), nil) if len(r.selectResp.Chunks) == 0 { continue } diff --git a/metrics/metrics.go b/metrics/metrics.go index ef822d4b8a668..5d150c158b165 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -113,6 +113,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnRegionsNumHistogram) prometheus.MustRegister(TiKVTxnWriteKVCountHistogram) prometheus.MustRegister(TiKVTxnWriteSizeHistogram) + prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TimeJumpBackCounter) prometheus.MustRegister(TransactionCounter) prometheus.MustRegister(TransactionDuration) diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 30989dbd91af4..bce08635c6cc4 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -178,4 +178,13 @@ var ( Name: "region_cache_operations_total", Help: "Counter of region cache.", }, []string{LblType, LblResult}) + + TiKVLocalLatchWaitTimeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "local_latch_wait_seconds", + Help: "Wait time of a get local latch.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }) ) diff --git a/session/session.go b/session/session.go index 4fdada8734a77..50b807d13dbef 100644 --- a/session/session.go +++ b/session/session.go @@ -53,6 +53,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" @@ -378,11 +379,16 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { func (s *session) CommitTxn(ctx context.Context) error { stmt := executor.ExecStmt{ - Text: "commit", + Text: "commitTxn", Ctx: s, StartTime: time.Now(), } + var commitDetail *execdetails.CommitDetails + ctx = context.WithValue(ctx, execdetails.CommitDetailCtxKey, &commitDetail) err := s.doCommitWithRetry(ctx) + if commitDetail != nil { + s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail) + } stmt.LogSlowQuery(s.sessionVars.TxnCtx.StartTS, err == nil) label := metrics.LblOK if err != nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 7f521dd1d28bd..1817f24396faf 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -253,14 +253,17 @@ func (sc *StatementContext) ResetForRetry() { // MergeExecDetails merges a single region execution details into self, used to print // the information in slow query log. -func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) { +func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, commitDetails *execdetails.CommitDetails) { sc.mu.Lock() - sc.mu.execDetails.ProcessTime += details.ProcessTime - sc.mu.execDetails.WaitTime += details.WaitTime - sc.mu.execDetails.BackoffTime += details.BackoffTime - sc.mu.execDetails.RequestCount++ - sc.mu.execDetails.TotalKeys += details.TotalKeys - sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys + if details != nil { + sc.mu.execDetails.ProcessTime += details.ProcessTime + sc.mu.execDetails.WaitTime += details.WaitTime + sc.mu.execDetails.BackoffTime += details.BackoffTime + sc.mu.execDetails.RequestCount++ + sc.mu.execDetails.TotalKeys += details.TotalKeys + sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys + } + sc.mu.execDetails.CommitDetail = commitDetails sc.mu.Unlock() } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 382663e0d3671..ee821dfeb3c57 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/execdetails" binlog "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -76,6 +77,7 @@ type twoPhaseCommitter struct { syncLog bool connID uint64 // connID is used for log. cleanWg sync.WaitGroup + detail *execdetails.CommitDetails } // newTwoPhaseCommitter creates a twoPhaseCommitter. @@ -140,8 +142,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS) } - metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(len(keys))) - metrics.TiKVTxnWriteSizeHistogram.Observe(float64(size)) + commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)} + metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) + metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) return &twoPhaseCommitter{ store: txn.store, txn: txn, @@ -152,6 +155,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro priority: getTxnPriority(txn), syncLog: getTxnSyncLog(txn), connID: connID, + detail: commitDetail, }, nil } @@ -203,6 +207,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA var sizeFunc = c.keySize if action == actionPrewrite { sizeFunc = c.keyValueSize + atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) } // Make sure the group that contains primary key goes first. batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize) @@ -369,10 +374,12 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock) locks = append(locks, lock) } + start := time.Now() ok, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } + atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start))) if !ok { err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { @@ -583,7 +590,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { }() binlogChan := c.prewriteBinlog() - err := c.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars), c.keys) + prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars) + start := time.Now() + err := c.prewriteKeys(prewriteBo, c.keys) + c.detail.PrewriteTime = time.Since(start) + c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond if binlogChan != nil { binlogErr := <-binlogChan if binlogErr != nil { @@ -595,11 +606,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return errors.Trace(err) } + start = time.Now() commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars)) if err != nil { log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS) return errors.Trace(err) } + c.detail.GetCommitTsTime = time.Since(start) // check commitTS if commitTS <= c.startTS { @@ -618,7 +631,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return errors.Annotate(err, txnRetryableMark) } - err = c.commitKeys(NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars), c.keys) + start = time.Now() + commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) + err = c.commitKeys(commitBo, c.keys) + c.detail.CommitTime = time.Since(start) + c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond if err != nil { if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 6fb62c0be767e..9f55fc84f7f2a 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/execdetails" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -187,6 +188,17 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if err != nil || committer == nil { return errors.Trace(err) } + defer func() { + ctxValue := ctx.Value(execdetails.CommitDetailCtxKey) + if ctxValue != nil { + commitDetail := ctxValue.(**execdetails.CommitDetails) + if *commitDetail != nil { + (*commitDetail).TxnRetry += 1 + } else { + *commitDetail = committer.detail + } + } + }() // latches disabled if txn.store.txnLatches == nil { err = committer.executeAndWriteFinishBinlog(ctx) @@ -207,7 +219,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { } // for transactions which need to acquire latches + start = time.Now() lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys) + committer.detail.LocalLatchTime = time.Since(start) + if committer.detail.LocalLatchTime > 0 { + metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds()) + } defer txn.store.txnLatches.UnLock(lock) if lock.IsStale() { err = errors.Errorf("startTS %d is stale", txn.startTS) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index ad384c51e0a7b..e128ba140d557 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -21,6 +21,9 @@ import ( "time" ) +// CommitDetailCtxKey presents CommitDetail info key in context. +const CommitDetailCtxKey = "commitDetail" + // ExecDetails contains execution detail information. type ExecDetails struct { ProcessTime time.Duration @@ -29,6 +32,21 @@ type ExecDetails struct { RequestCount int TotalKeys int64 ProcessedKeys int64 + CommitDetail *CommitDetails +} + +// CommitDetails contains commit detail information. +type CommitDetails struct { + GetCommitTsTime time.Duration + PrewriteTime time.Duration + CommitTime time.Duration + LocalLatchTime time.Duration + TotalBackoffTime time.Duration + ResolveLockTime int64 + WriteKeys int + WriteSize int + PrewriteRegionNum int32 + TxnRetry int } // String implements the fmt.Stringer interface. @@ -52,6 +70,41 @@ func (d ExecDetails) String() string { if d.ProcessedKeys > 0 { parts = append(parts, fmt.Sprintf("processed_keys:%d", d.ProcessedKeys)) } + commitDetails := d.CommitDetail + if commitDetails != nil { + if commitDetails.PrewriteTime > 0 { + parts = append(parts, fmt.Sprintf("prewrite_time:%v", commitDetails.PrewriteTime)) + } + if commitDetails.CommitTime > 0 { + parts = append(parts, fmt.Sprintf("commit_time:%v", commitDetails.CommitTime)) + } + if commitDetails.GetCommitTsTime > 0 { + parts = append(parts, fmt.Sprintf("get_commit_ts_time:%v", commitDetails.GetCommitTsTime)) + } + if commitDetails.TotalBackoffTime > 0 { + parts = append(parts, fmt.Sprintf("total_backoff_time:%v", commitDetails.TotalBackoffTime)) + } + resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime) + if resolveLockTime > 0 { + parts = append(parts, fmt.Sprintf("resolve_lock_time:%d", time.Duration(resolveLockTime))) + } + if commitDetails.LocalLatchTime > 0 { + parts = append(parts, fmt.Sprintf("local_latch_wait_time:%v", commitDetails.LocalLatchTime)) + } + if commitDetails.WriteKeys > 0 { + parts = append(parts, fmt.Sprintf("write_keys:%d", commitDetails.WriteKeys)) + } + if commitDetails.WriteSize > 0 { + parts = append(parts, fmt.Sprintf("write_size:%d", commitDetails.WriteSize)) + } + prewriteRegionNum := atomic.LoadInt32(&commitDetails.PrewriteRegionNum) + if prewriteRegionNum > 0 { + parts = append(parts, fmt.Sprintf("prewrite_region:%d", prewriteRegionNum)) + } + if commitDetails.TxnRetry > 0 { + parts = append(parts, fmt.Sprintf("txn_retry:%d", commitDetails.TxnRetry)) + } + } return strings.Join(parts, " ") }