Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvproto.CheckTxnStatus response #13432

Merged
merged 15 commits into from
Nov 18, 2019
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
34 changes: 23 additions & 11 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,40 +661,52 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
}

func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)
startTS := uint64(5 << 18)
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, action, err := s.store.CheckTxnStatus([]byte("pk"), startTS, startTS+100, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))
c.Assert(commitTS, Equals, uint64(startTS+101))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS)

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_NoAction)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666)
currentTS := uint64(777 << 18)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_TTLExpireRollback)

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
_, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_LockNotExistRollback)

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Expand All @@ -710,7 +722,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
_, _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.Action, error)
Close() error
}

Expand Down
61 changes: 37 additions & 24 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,13 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64,
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, action kvrpcpb.Action, err error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

action = kvrpcpb.Action_NoAction

startKey := mvccEncode(primaryKey, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
Expand All @@ -1046,9 +1049,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
dec := lockDecoder{
expectKey: primaryKey,
}
ok, err := dec.Decode(iter)
var ok bool
ok, err = dec.Decode(iter)
if err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == lockTS {
Expand All @@ -1058,17 +1063,20 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil
}

// If this is a large transaction and the lock is active, push forward the minCommitTS.
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction.
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB).
if lock.minCommitTS > 0 {
action = kvrpcpb.Action_MinCommitTSPushed
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1
if lock.minCommitTS < callerStartTS+1 {
lock.minCommitTS = callerStartTS + 1
Expand All @@ -1081,33 +1089,36 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

writeKey := mvccEncode(primaryKey, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, 0, errors.Trace(err)
writeValue, err1 := lock.MarshalBinary()
if err1 != nil {
err = errors.Trace(err1)
return
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 = mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
}
}

return lock.ttl, 0, nil
return lock.ttl, 0, action, nil
}

// If current transaction's lock does not exist.
// If the commit info of the current transaction exists.
c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS)
if err != nil {
return 0, 0, errors.Trace(err)
c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS)
if err1 != nil {
err = errors.Trace(err1)
return
}
if ok {
// If current transaction is already committed.
if c.valueType != typeRollback {
return 0, c.commitTS, nil
return 0, c.commitTS, action, nil
}
// If current transaction is already rollback.
return 0, 0, nil
return 0, 0, kvrpcpb.Action_NoAction, nil
}
}

Expand All @@ -1120,16 +1131,18 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
if err1 := rollbackLock(batch, primaryKey, lockTS); err1 != nil {
err = errors.Trace(err1)
return
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 := mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
return 0, 0, action, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
ttl, commitTS, action, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
resp.LockTtl, resp.CommitVersion = ttl, commitTS
resp.LockTtl, resp.CommitVersion, resp.Action = ttl, commitTS, action
}
return &resp
}
Expand Down
4 changes: 3 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve
type TxnStatus struct {
ttl uint64
commitTS uint64
action kvrpcpb.Action
}
Comment on lines 109 to 113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this definition be moved to a better place in store directory, so those CheckTxnStatus functions can return this struct instead of returning its three fields in a long tuple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would involve too many irrelevant changes in this PR


// IsCommitted returns true if the txn's final status is Commit.
Expand Down Expand Up @@ -397,7 +398,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
return TxnStatus{l.TTL, 0}, nil
return TxnStatus{ttl: l.TTL}, nil
}

// Handle txnNotFound error.
Expand Down Expand Up @@ -482,6 +483,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
status.action = cmdResp.Action
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
} else {
Expand Down
14 changes: 11 additions & 3 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.ttl, Greater, uint64(0), Commentf("action:%s", status.action))
}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
Expand Down Expand Up @@ -234,6 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -279,6 +280,8 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
c.Assert(currentTS, Greater, txn.StartTS())

bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
resolver := newLockResolver(s.store)
// Call getTxnStatus to check the lock status.
Expand All @@ -287,6 +290,9 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))
// TODO: It should be Action_MinCommitTSPushed if minCommitTS is set in the Prewrite request.
// Update here to kvrpcpb.Action_MinCommitTSPushed in the next PR.
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Test the ResolveLocks API
lock := s.mustGetLock(c, []byte("second"))
Expand All @@ -303,10 +309,11 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -366,14 +373,15 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
lock = &Lock{
Key: []byte("second"),
Primary: []byte("key"),
Primary: []byte("key_not_exist"),
TxnID: startTS,
TTL: 1000,
}
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
Expand Down