diff --git a/go.mod b/go.mod index 8d04fcf627764..e9b3b6fc1fe35 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190430075617-bf45ab20bfc4 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 // indirect github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030 - github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d + github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190718031118-20e37a65d718 github.com/pingcap/pd v2.1.12+incompatible diff --git a/go.sum b/go.sum index 73f3543077b4b..63ca807085f68 100644 --- a/go.sum +++ b/go.sum @@ -105,8 +105,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030 h1:XJLuW0lsP7vAtQ2iPjZwvXZ14m5urp9No+Qr06ZZcTo= github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d h1:liASf0yI9s8GSoJ40g4eXXoTUVV/3mmTb3dpooQs7PI= -github.com/pingcap/kvproto v0.0.0-20190801050232-8be8979a1b6d/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk= +github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726 h1:AzGIEmaYVYMtmkiuSMuOYjVtQ5IoiGsW7msECI7UPGM= +github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20190718031118-20e37a65d718 h1:raZFhem9Ga8BcuWhQ6daejp5E5rIeyET0oQddyWK2Q0= diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 61566862f3807..f9f7b94899512 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -47,6 +47,7 @@ type mvccLock struct { value []byte op kvrpcpb.Op ttl uint64 + txnSize uint64 } type mvccEntry struct { diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 3e2d12507c516..daba827b19bba 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -503,8 +503,9 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, anyError := false batch := &leveldb.Batch{} errs := make([]error, 0, len(mutations)) + txnSize := len(mutations) for _, m := range mutations { - err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl) + err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize)) errs = append(errs, err) if err != nil { anyError = true @@ -520,7 +521,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, return errs } -func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error { +func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64) error { startKey := mvccEncode(mutation.Key, lockVer) iter := newIterator(db, &util.Range{ Start: startKey, @@ -559,6 +560,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu value: mutation.Value, op: mutation.GetOp(), ttl: ttl, + txnSize: txnSize, } writeKey := mvccEncode(mutation.Key, lockVer) writeValue, err := lock.MarshalBinary() diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 4a23632366758..5e6813571f50d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -379,6 +379,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) PrimaryLock: c.primary(), StartVersion: c.startTS, LockTtl: c.lockTTL, + TxnSize: uint64(len(batch.keys)), }, Context: pb.Context{ Priority: c.priority, diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index a0a7ce9a8a256..e022ca3e66e92 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -33,6 +33,9 @@ import ( // ResolvedCacheSize is max number of cached txn status. const ResolvedCacheSize = 2048 +// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`. +const bigTxnThreshold = 16 + // LockResolver resolves locks and also caches resolved txn status. type LockResolver struct { store Storage @@ -113,6 +116,7 @@ type Lock struct { Primary []byte TxnID uint64 TTL uint64 + TxnSize uint64 } func (l *Lock) String() string { @@ -125,11 +129,13 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock { if ttl == 0 { ttl = defaultLockTTL } + txnSize := l.GetTxnSize() return &Lock{ Key: l.GetKey(), Primary: l.GetPrimaryLock(), TxnID: l.GetLockVersion(), TTL: ttl, + TxnSize: txnSize, } } @@ -364,6 +370,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error { metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_locks").Inc() + cleanWholeRegion := l.TxnSize >= bigTxnThreshold for { loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key) if err != nil { @@ -381,6 +388,12 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl if status.IsCommitted() { req.ResolveLock.CommitVersion = status.CommitTS() } + if l.TxnSize < bigTxnThreshold { + // Only resolve specified keys when it is a small transaction, + // prevent from scanning the whole region in this case. + metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_lock_lite").Inc() + req.ResolveLock.Keys = [][]byte{l.Key} + } resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -405,7 +418,9 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl logutil.Logger(context.Background()).Error("resolveLock error", zap.Error(err)) return err } - cleanRegions[loc.Region] = struct{}{} + if cleanWholeRegion { + cleanRegions[loc.Region] = struct{}{} + } return nil } }