Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refine commit txn slow log (#8731) #8874

Merged
merged 2 commits into from
Dec 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (r *selectResult) getSelectResp() error {
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails())
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
if len(r.selectResp.Chunks) == 0 {
continue
}
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnRegionsNumHistogram)
prometheus.MustRegister(TiKVTxnWriteKVCountHistogram)
prometheus.MustRegister(TiKVTxnWriteSizeHistogram)
prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram)
prometheus.MustRegister(TimeJumpBackCounter)
prometheus.MustRegister(TransactionCounter)
prometheus.MustRegister(TransactionDuration)
Expand Down
9 changes: 9 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,13 @@ var (
Name: "region_cache_operations_total",
Help: "Counter of region cache.",
}, []string{LblType, LblResult})

TiKVLocalLatchWaitTimeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "local_latch_wait_seconds",
Help: "Wait time of a get local latch.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
)
8 changes: 7 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -378,11 +379,16 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {

func (s *session) CommitTxn(ctx context.Context) error {
stmt := executor.ExecStmt{
Text: "commit",
Text: "commitTxn",
Ctx: s,
StartTime: time.Now(),
}
var commitDetail *execdetails.CommitDetails
ctx = context.WithValue(ctx, execdetails.CommitDetailCtxKey, &commitDetail)
err := s.doCommitWithRetry(ctx)
if commitDetail != nil {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}
stmt.LogSlowQuery(s.sessionVars.TxnCtx.StartTS, err == nil)
label := metrics.LblOK
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,17 @@ func (sc *StatementContext) ResetForRetry() {

// MergeExecDetails merges a single region execution details into self, used to print
// the information in slow query log.
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) {
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, commitDetails *execdetails.CommitDetails) {
sc.mu.Lock()
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
if details != nil {
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
}
sc.mu.execDetails.CommitDetail = commitDetails
sc.mu.Unlock()
}

Expand Down
25 changes: 21 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -76,6 +77,7 @@ type twoPhaseCommitter struct {
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
Expand Down Expand Up @@ -140,8 +142,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
}

metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(len(keys)))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(size))
commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
Expand All @@ -152,6 +155,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
priority: getTxnPriority(txn),
syncLog: getTxnSyncLog(txn),
connID: connID,
detail: commitDetail,
}, nil
}

Expand Down Expand Up @@ -203,6 +207,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
}
// Make sure the group that contains primary key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
Expand Down Expand Up @@ -369,10 +374,12 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
Expand Down Expand Up @@ -583,7 +590,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}()

binlogChan := c.prewriteBinlog()
err := c.prewriteKeys(NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars), c.keys)
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
c.detail.PrewriteTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
Expand All @@ -595,11 +606,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Trace(err)
}

start = time.Now()
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)

// check commitTS
if commitTS <= c.startTS {
Expand All @@ -618,7 +631,11 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Annotate(err, txnRetryableMark)
}

err = c.commitKeys(NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars), c.keys)
start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
c.detail.CommitTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
Expand Down
17 changes: 17 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/execdetails"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -187,6 +188,17 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if err != nil || committer == nil {
return errors.Trace(err)
}
defer func() {
ctxValue := ctx.Value(execdetails.CommitDetailCtxKey)
if ctxValue != nil {
commitDetail := ctxValue.(**execdetails.CommitDetails)
if *commitDetail != nil {
(*commitDetail).TxnRetry += 1
} else {
*commitDetail = committer.detail
}
}
}()
// latches disabled
if txn.store.txnLatches == nil {
err = committer.executeAndWriteFinishBinlog(ctx)
Expand All @@ -207,7 +219,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
}

// for transactions which need to acquire latches
start = time.Now()
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
committer.detail.LocalLatchTime = time.Since(start)
if committer.detail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds())
}
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
Expand Down
53 changes: 53 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"time"
)

// CommitDetailCtxKey presents CommitDetail info key in context.
const CommitDetailCtxKey = "commitDetail"

// ExecDetails contains execution detail information.
type ExecDetails struct {
ProcessTime time.Duration
Expand All @@ -29,6 +32,21 @@ type ExecDetails struct {
RequestCount int
TotalKeys int64
ProcessedKeys int64
CommitDetail *CommitDetails
}

// CommitDetails contains commit detail information.
type CommitDetails struct {
GetCommitTsTime time.Duration
PrewriteTime time.Duration
CommitTime time.Duration
LocalLatchTime time.Duration
TotalBackoffTime time.Duration
ResolveLockTime int64
WriteKeys int
WriteSize int
PrewriteRegionNum int32
TxnRetry int
}

// String implements the fmt.Stringer interface.
Expand All @@ -52,6 +70,41 @@ func (d ExecDetails) String() string {
if d.ProcessedKeys > 0 {
parts = append(parts, fmt.Sprintf("processed_keys:%d", d.ProcessedKeys))
}
commitDetails := d.CommitDetail
if commitDetails != nil {
if commitDetails.PrewriteTime > 0 {
parts = append(parts, fmt.Sprintf("prewrite_time:%v", commitDetails.PrewriteTime))
}
if commitDetails.CommitTime > 0 {
parts = append(parts, fmt.Sprintf("commit_time:%v", commitDetails.CommitTime))
}
if commitDetails.GetCommitTsTime > 0 {
parts = append(parts, fmt.Sprintf("get_commit_ts_time:%v", commitDetails.GetCommitTsTime))
}
if commitDetails.TotalBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("total_backoff_time:%v", commitDetails.TotalBackoffTime))
}
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
parts = append(parts, fmt.Sprintf("resolve_lock_time:%d", time.Duration(resolveLockTime)))
}
if commitDetails.LocalLatchTime > 0 {
parts = append(parts, fmt.Sprintf("local_latch_wait_time:%v", commitDetails.LocalLatchTime))
}
if commitDetails.WriteKeys > 0 {
parts = append(parts, fmt.Sprintf("write_keys:%d", commitDetails.WriteKeys))
}
if commitDetails.WriteSize > 0 {
parts = append(parts, fmt.Sprintf("write_size:%d", commitDetails.WriteSize))
}
prewriteRegionNum := atomic.LoadInt32(&commitDetails.PrewriteRegionNum)
if prewriteRegionNum > 0 {
parts = append(parts, fmt.Sprintf("prewrite_region:%d", prewriteRegionNum))
}
if commitDetails.TxnRetry > 0 {
parts = append(parts, fmt.Sprintf("txn_retry:%d", commitDetails.TxnRetry))
}
}
return strings.Join(parts, " ")
}

Expand Down