Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvproto.CheckTxnStatus response #13432

Merged
merged 15 commits into from
Nov 18, 2019
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3 h1:3CYI9xg87xNAD+es02gZxbX/ky4KQeoFBsNOzuoAQZg=
github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20191105193234-27840fff0d09 h1:k2LrtvxLSqJVi/o6O71W+AdZgHzU/mNX7kOXzWUORn0=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/shlex v0.0.0-20181106134648-c34317bd91bf/go.mod h1:RpwtwJQFrIEPstU94h88MWPXP2ektJZ8cZ0YntAmXiE=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -191,6 +194,10 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e h1:TSRdaTUS0soSRxj/0z+LrqcDRuObQLHfea6ZdUYLw9g=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
34 changes: 23 additions & 11 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,40 +661,52 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
}

func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)
startTS := uint64(5 << 18)
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, _, err := s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+2)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))
c.Assert(commitTS, Equals, uint64(startTS+2))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS)

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)
var reason kvrpcpb.RollbackReason
ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_NoReason)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666)
currentTS := uint64(777 << 18)
ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_TTLExpire)

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
_, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
ttl, commitTS, reason, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(reason, Equals, kvrpcpb.RollbackReason_LockNotExist)

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Expand All @@ -710,7 +722,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
_, _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.RollbackReason, error)
Close() error
}

Expand Down
58 changes: 35 additions & 23 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,13 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64,
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, rollbackReason kvrpcpb.RollbackReason, err error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

rollbackReason = kvrpcpb.RollbackReason_NoReason

startKey := mvccEncode(primaryKey, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
Expand All @@ -1046,9 +1049,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
dec := lockDecoder{
expectKey: primaryKey,
}
ok, err := dec.Decode(iter)
var ok bool
ok, err = dec.Decode(iter)
if err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == lockTS {
Expand All @@ -1058,12 +1063,14 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.RollbackReason_TTLExpire, nil
}

// If this is a large transaction and the lock is active, push forward the minCommitTS.
Expand All @@ -1081,33 +1088,36 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

writeKey := mvccEncode(primaryKey, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, 0, errors.Trace(err)
writeValue, err1 := lock.MarshalBinary()
if err1 != nil {
err = errors.Trace(err1)
return
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 = mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
}
}

return lock.ttl, 0, nil
return lock.ttl, 0, rollbackReason, nil
}

// If current transaction's lock does not exist.
// If the commit info of the current transaction exists.
c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS)
if err != nil {
return 0, 0, errors.Trace(err)
c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS)
if err1 != nil {
err = errors.Trace(err1)
return
}
if ok {
// If current transaction is already committed.
if c.valueType != typeRollback {
return 0, c.commitTS, nil
return 0, c.commitTS, rollbackReason, nil
}
// If current transaction is already rollback.
return 0, 0, nil
return 0, 0, kvrpcpb.RollbackReason_NoReason, nil
}
}

Expand All @@ -1120,16 +1130,18 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
if err1 := rollbackLock(batch, primaryKey, lockTS); err1 != nil {
err = errors.Trace(err1)
return
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 := mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.RollbackReason_LockNotExist, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
return 0, 0, rollbackReason, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
ttl, commitTS, rollbackReason, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable name should update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err != nil {
resp.Error = convertToKeyError(err)
} else {
resp.LockTtl, resp.CommitVersion = ttl, commitTS
resp.LockTtl, resp.CommitVersion, resp.RollbackReason = ttl, commitTS, rollbackReason
}
return &resp
}
Expand Down
8 changes: 5 additions & 3 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve

// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
ttl uint64
commitTS uint64
ttl uint64
commitTS uint64
rollbackReason kvrpcpb.RollbackReason
}

// IsCommitted returns true if the txn's final status is Commit.
Expand Down Expand Up @@ -397,7 +398,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
return TxnStatus{l.TTL, 0}, nil
return TxnStatus{ttl: l.TTL}, nil
}

// Handle txnNotFound error.
Expand Down Expand Up @@ -486,6 +487,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
status.ttl = cmdResp.LockTtl
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
} else {
if cmdResp.CommitVersion == 0 {
status.rollbackReason = cmdResp.RollbackReason
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
Expand Down
7 changes: 5 additions & 2 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.ttl, Greater, uint64(0), Commentf("rollback reason:%s", status.rollbackReason))
}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
Expand Down Expand Up @@ -234,6 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_NoReason)

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -303,10 +304,11 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_NoReason)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -374,6 +376,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.rollbackReason, Equals, kvrpcpb.RollbackReason_LockNotExist)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
Expand Down