Skip to content

Commit

Permalink
executor: add pessimistic lock keys runtime information (#19547)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored Sep 1, 2020
1 parent bdb673f commit 915d84d
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 83 deletions.
28 changes: 17 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,13 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
seVars := sctx.GetSessionVars()
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout)
var lockKeyStats *execdetails.LockKeysDetails
ctx = context.WithValue(ctx, execdetails.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
err = txn.LockKeys(ctx, lockCtx, keys...)
if lockKeyStats != nil {
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
}
if err == nil {
return nil
}
Expand Down Expand Up @@ -773,10 +778,21 @@ var (
// 3. record execute duration metric.
// 4. update the `PrevStmt` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
sessVars := a.Ctx.GetSessionVars()
prevStmt := a.GetTextToLog()
if sessVars.EnableLogDesensitization {
sessVars.PrevStmt = FormatSQL(prevStmt, nil)
Expand Down Expand Up @@ -843,16 +859,6 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
stmtDetail = *(stmtDetailRaw.(*execdetails.StmtExecDetails))
}
execDetail := sessVars.StmtCtx.GetExecDetails()

// Attach commit runtime stats to executor runtime stats.
if execDetail.CommitDetail != nil && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
Expand Down
8 changes: 7 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,13 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *kv.LockCtx,
if err != nil {
return err
}
return txn.LockKeys(sessionctx.SetCommitCtx(ctx, se), lockCtx, keys...)
var lockKeyStats *execdetails.LockKeysDetails
ctx = context.WithValue(ctx, execdetails.LockKeysDetailCtxKey, &lockKeyStats)
err = txn.LockKeys(sessionctx.SetCommitCtx(ctx, se), lockCtx, keys...)
if lockKeyStats != nil {
sctx.MergeLockKeysExecDetails(lockKeyStats)
}
return err
}

// LimitExec represents limit executor
Expand Down
20 changes: 17 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6242,15 +6242,29 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
"update t1 set a=a+1 where a=6;",
}

for _, sql := range testSQLs {
tk.MustExec(sql)
getRootStats := func() string {
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
c.Assert(stats.String(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
return stats.String()
}
for _, sql := range testSQLs {
tk.MustExec(sql)
c.Assert(getRootStats(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
}

// Test for lock keys stats.
tk.MustExec("begin pessimistic")
tk.MustExec("update t1 set b=b+1")
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 for update").Check(testkit.Rows("5 6", "7 7"))
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")
}

func (s *testSuite) TestIssue13758(c *C) {
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -269,6 +270,7 @@ type LockCtx struct {
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
Stats *execdetails.LockKeysDetails
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,17 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.Unlock()
}

// MergeLockKeysExecDetails merges lock keys execution details into self.
func (sc *StatementContext) MergeLockKeysExecDetails(lockKeys *execdetails.LockKeysDetails) {
sc.mu.Lock()
if sc.mu.execDetails.LockKeysDetail == nil {
sc.mu.execDetails.LockKeysDetail = lockKeys
} else {
sc.mu.execDetails.LockKeysDetail.Merge(lockKeys)
}
sc.mu.Unlock()
}

// GetExecDetails gets the execution details for the statement.
func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
Expand Down
8 changes: 7 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))

var sizeFunc = c.keySize
if _, ok := action.(actionPrewrite); ok {

switch act := action.(type) {
case actionPrewrite:
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if len(bo.errors) == 0 {
for _, group := range groups {
Expand All @@ -439,6 +441,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.getDetail().PrewriteRegionNum, int32(len(groups)))
case actionPessimisticLock:
if act.LockCtx.Stats != nil {
act.LockCtx.Stats.RegionNum = int32(len(groups))
}
}

batchBuilder := newBatched(c.primary())
Expand Down
9 changes: 9 additions & 0 deletions store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
time.Sleep(300 * time.Millisecond)
return kv.ErrWriteConflict
})
startTime := time.Now()
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -146,10 +151,14 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
msBeforeTxnExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
if err != nil {
return errors.Trace(err)
}
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.ResolveLockTime, int64(time.Since(startTime)))
}

// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
// Exclude keys that are already locked.
var err error
keys := make([][]byte, 0, len(keysInput))
startTime := time.Now()
txn.mu.Lock()
defer txn.mu.Unlock()
defer func() {
Expand All @@ -358,6 +359,14 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
if lockCtx.LockKeysCount != nil {
*lockCtx.LockKeysCount += int32(len(keys))
}
if lockCtx.Stats != nil {
lockCtx.Stats.TotalTime = time.Since(startTime)
ctxValue := ctx.Value(execdetails.LockKeysDetailCtxKey)
if ctxValue != nil {
lockKeysDetail := ctxValue.(**execdetails.LockKeysDetails)
*lockKeysDetail = lockCtx.Stats
}
}
}()
memBuf := txn.us.GetMemBuffer()
for _, key := range keysInput {
Expand Down Expand Up @@ -405,12 +414,21 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
assignedPrimaryKey = true
}

lockCtx.Stats = &execdetails.LockKeysDetails{
LockKeys: int32(len(keys)),
}
bo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1
err = txn.committer.pessimisticLockMutations(bo, lockCtx, CommitterMutations{keys: keys})
if bo.totalSleep > 0 {
atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.totalSleep)*int64(time.Millisecond))
lockCtx.Stats.Mu.Lock()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.types...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
Expand Down
Loading

0 comments on commit 915d84d

Please sign in to comment.