Skip to content

Commit

Permalink
support resolve specified lock keys (#10292) (#11889)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 authored and sre-bot committed Aug 27, 2019
1 parent e2b434e commit 452dd0b
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190430075617-bf45ab20bfc4
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 // indirect
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030
github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d
github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190718031118-20e37a65d718
github.com/pingcap/pd v2.1.12+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030 h1:XJLuW0lsP7vAtQ2iPjZwvXZ14m5urp9No+Qr06ZZcTo=
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d h1:liASf0yI9s8GSoJ40g4eXXoTUVV/3mmTb3dpooQs7PI=
github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726 h1:AzGIEmaYVYMtmkiuSMuOYjVtQ5IoiGsW7msECI7UPGM=
github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190718031118-20e37a65d718 h1:raZFhem9Ga8BcuWhQ6daejp5E5rIeyET0oQddyWK2Q0=
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type mvccLock struct {
value []byte
op kvrpcpb.Op
ttl uint64
txnSize uint64
}

type mvccEntry struct {
Expand Down
6 changes: 4 additions & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
anyError := false
batch := &leveldb.Batch{}
errs := make([]error, 0, len(mutations))
txnSize := len(mutations)
for _, m := range mutations {
err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl)
err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize))
errs = append(errs, err)
if err != nil {
anyError = true
Expand All @@ -520,7 +521,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
return errs
}

func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Start: startKey,
Expand Down Expand Up @@ -559,6 +560,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
value: mutation.Value,
op: mutation.GetOp(),
ttl: ttl,
txnSize: txnSize,
}
writeKey := mvccEncode(mutation.Key, lockVer)
writeValue, err := lock.MarshalBinary()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
TxnSize: uint64(len(batch.keys)),
},
Context: pb.Context{
Priority: c.priority,
Expand Down
17 changes: 16 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
// ResolvedCacheSize is max number of cached txn status.
const ResolvedCacheSize = 2048

// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`.
const bigTxnThreshold = 16

// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store Storage
Expand Down Expand Up @@ -113,6 +116,7 @@ type Lock struct {
Primary []byte
TxnID uint64
TTL uint64
TxnSize uint64
}

func (l *Lock) String() string {
Expand All @@ -125,11 +129,13 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock {
if ttl == 0 {
ttl = defaultLockTTL
}
txnSize := l.GetTxnSize()
return &Lock{
Key: l.GetKey(),
Primary: l.GetPrimaryLock(),
TxnID: l.GetLockVersion(),
TTL: ttl,
TxnSize: txnSize,
}
}

Expand Down Expand Up @@ -364,6 +370,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_locks").Inc()
cleanWholeRegion := l.TxnSize >= bigTxnThreshold
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
Expand All @@ -381,6 +388,12 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
if status.IsCommitted() {
req.ResolveLock.CommitVersion = status.CommitTS()
}
if l.TxnSize < bigTxnThreshold {
// Only resolve specified keys when it is a small transaction,
// prevent from scanning the whole region in this case.
metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_lock_lite").Inc()
req.ResolveLock.Keys = [][]byte{l.Key}
}
resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand All @@ -405,7 +418,9 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
logutil.Logger(context.Background()).Error("resolveLock error", zap.Error(err))
return err
}
cleanRegions[loc.Region] = struct{}{}
if cleanWholeRegion {
cleanRegions[loc.Region] = struct{}{}
}
return nil
}
}

0 comments on commit 452dd0b

Please sign in to comment.