Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikvclient: reduce wait backoff time when lock has be expired #10006

Merged
merged 4 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions store/mockoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
}

// UntilExpired implement oracle.Oracle interface.
func (o *MockOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
o.RLock()
defer o.RUnlock()
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset))
}

// Close implements oracle.Oracle interface.
func (o *MockOracle) Close() {

Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,13 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -619,12 +619,12 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
locks = append(locks, lock)
}
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down
26 changes: 19 additions & 7 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t backoffType) Counter() prometheus.Counter {
// NewBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int {
if base < 2 {
// Top prevent panic in 'rand.Intn'.
base = 2
}
attempts := 0
lastSleep := base
return func(ctx context.Context) int {
return func(ctx context.Context, maxSleepMs int) int {
var sleep int
switch jitter {
case NoJitter:
Expand All @@ -102,8 +102,14 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
logutil.Logger(context.Background()).Debug("backoff",
zap.Int("base", base),
zap.Int("sleep", sleep))

realSleep := sleep
// when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds.
if maxSleepMs >= 0 && realSleep > maxSleepMs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the maxSleepMs will be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxSleepMs never should be 0 in current usage, but backoff is a common function and we add new maxSleepMs Param, so we should take care about it and in logic "force sleep zero" is useful.

realSleep = maxSleepMs
}
select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-time.After(time.Duration(realSleep) * time.Millisecond):
case <-ctx.Done():
}

Expand All @@ -130,7 +136,7 @@ const (
boServerBusy
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context) int {
func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
if vars.Hook != nil {
vars.Hook(t.String(), vars)
}
Expand Down Expand Up @@ -217,7 +223,7 @@ var CommitMaxBackoff = 41000
type Backoffer struct {
ctx context.Context

fn map[backoffType]func(context.Context) int
fn map[backoffType]func(context.Context, int) int
maxSleep int
totalSleep int
errors []error
Expand Down Expand Up @@ -253,6 +259,12 @@ func (b *Backoffer) WithVars(vars *kv.Variables) *Backoffer {
// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ backoffType, err error) error {
return b.BackoffWithMaxSleep(typ, -1, err)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err error) error {
if strings.Contains(err.Error(), mismatchClusterID) {
logutil.Logger(context.Background()).Fatal("critical error", zap.Error(err))
}
Expand All @@ -265,15 +277,15 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
typ.Counter().Inc()
// Lazy initialize.
if b.fn == nil {
b.fn = make(map[backoffType]func(context.Context) int)
b.fn = make(map[backoffType]func(context.Context, int) int)
}
f, ok := b.fn[typ]
if !ok {
f = typ.createFn(b.vars)
b.fn[typ] = f
}

b.totalSleep += f(b.ctx)
b.totalSleep += f(b.ctx, maxSleepMs)
b.types = append(b.types, typ)

var startTs interface{}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,12 +778,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
msBeforeExpired, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
if !ok {
if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleep(boTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
33 changes: 23 additions & 10 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
Expand Down Expand Up @@ -265,47 +266,59 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msBeforeTxnExpired stands for "milliseconds before transaction expired"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems yes.

if len(locks) == 0 {
return true, nil
return
}

tikvLockResolverCountWithResolve.Inc()

var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
return false, nil
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}

cleanRegions := cleanTxns[l.TxnID]
if cleanRegions == nil {
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
return false, errors.Trace(err)
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
}
}
return len(expiredLocks) == len(locks), nil
return
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
Expand Down
1 change: 1 addition & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Oracle interface {
GetTimestamp(ctx context.Context) (uint64, error)
GetTimestampAsync(ctx context.Context) Future
IsExpired(lockTimestamp uint64, TTL uint64) bool
UntilExpired(lockTimeStamp uint64, TTL uint64) int64
Close()
}

Expand Down
54 changes: 54 additions & 0 deletions store/tikv/oracle/oracles/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package oracles

import (
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
)

// SetOracleHookCurrentTime exports localOracle's time hook to test.
func SetOracleHookCurrentTime(oc oracle.Oracle, t time.Time) {
switch o := oc.(type) {
case *localOracle:
if o.hook == nil {
o.hook = &struct {
currentTime time.Time
}{}
}
o.hook.currentTime = t
}
}

// ClearOracleHook exports localOracle's clear hook method
func ClearOracleHook(oc oracle.Oracle) {
switch o := oc.(type) {
case *localOracle:
o.hook = nil
}
}

// NewEmptyPDOracle exports pdOracle struct to test
func NewEmptyPDOracle() oracle.Oracle {
return &pdOracle{}
}

// SetEmptyPDOracleLastTs exports PD oracle's last ts to test.
func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) {
switch o := oc.(type) {
case *pdOracle:
o.lastTS = ts
}
}
24 changes: 22 additions & 2 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type localOracle struct {
sync.Mutex
lastTimeStampTS uint64
n uint64
hook *struct {
currentTime time.Time
}
}

// NewLocalOracle creates an Oracle that uses local time as data source.
Expand All @@ -35,13 +38,21 @@ func NewLocalOracle() oracle.Oracle {
}

func (l *localOracle) IsExpired(lockTS uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now()) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.GetPhysical(now) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
}

func (l *localOracle) GetTimestamp(context.Context) (uint64, error) {
l.Lock()
defer l.Unlock()
physical := oracle.GetPhysical(time.Now())
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
physical := oracle.GetPhysical(now)
ts := oracle.ComposeTS(physical, 0)
if l.lastTimeStampTS == ts {
l.n++
Expand All @@ -68,5 +79,14 @@ func (f *future) Wait() (uint64, error) {
return f.l.GetTimestamp(f.ctx)
}

// UntilExpired implement oracle.Oracle interface.
func (l *localOracle) UntilExpired(lockTimeStamp uint64, TTL uint64) int64 {
lysu marked this conversation as resolved.
Show resolved Hide resolved
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(now)
}

func (l *localOracle) Close() {
}
26 changes: 21 additions & 5 deletions store/tikv/oracle/oracles/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package oracles
package oracles_test

import (
"context"
"testing"
"time"

"github.com/pingcap/tidb/store/tikv/oracle/oracles"
)

func TestLocalOracle(t *testing.T) {
l := NewLocalOracle()
l := oracles.NewLocalOracle()
defer l.Close()
m := map[uint64]struct{}{}
for i := 0; i < 100000; i++ {
Expand All @@ -37,11 +39,13 @@ func TestLocalOracle(t *testing.T) {
}

func TestIsExpired(t *testing.T) {
o := NewLocalOracle()
o := oracles.NewLocalOracle()
defer o.Close()
start := time.Now()
oracles.SetOracleHookCurrentTime(o, start)
ts, _ := o.GetTimestamp(context.Background())
time.Sleep(50 * time.Millisecond)
expire := o.IsExpired(uint64(ts), 40)
oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond))
expire := o.IsExpired(uint64(ts), 5)
if !expire {
t.Error("should expired")
}
Expand All @@ -50,3 +54,15 @@ func TestIsExpired(t *testing.T) {
t.Error("should not expired")
}
}

func TestLocalOracle_UntilExpired(t *testing.T) {
o := oracles.NewLocalOracle()
defer o.Close()
start := time.Now()
oracles.SetOracleHookCurrentTime(o, start)
ts, _ := o.GetTimestamp(context.Background())
oracles.SetOracleHookCurrentTime(o, start.Add(10*time.Millisecond))
if o.UntilExpired(uint64(ts), 5) != -5 || o.UntilExpired(uint64(ts), 15) != 5 {
t.Error("until expired should be +-5")
}
}
Loading