diff --git a/go.mod b/go.mod index 2b5696b9c9b7d..68efd2eee1db2 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/errors v0.11.0 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32 + github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562 github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97 github.com/pingcap/pd v2.1.0-rc.4+incompatible diff --git a/go.sum b/go.sum index b407d1de7de69..403c6e5a729c6 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32 h1:9uwqk2DvsAKImRKYAjERMuIf5ZiCcNFhaFhgFRXw7X0= -github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562 h1:32oF1/8lVnBR2JVcCAnKPQATTOX0+ckRDFpjQk4Ngno= +github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97 h1:GIhPQAwFwnf6cSdVYXdSNkx171Nl9ZmIVYrOtN3I2lw= diff --git a/kv/union_store.go b/kv/union_store.go index b32792c63f199..b73f1c28de0ae 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -14,8 +14,6 @@ package kv import ( - "bytes" - "github.com/pingcap/errors" ) @@ -23,9 +21,8 @@ import ( // Also, it provides some transaction related utilities. type UnionStore interface { MemBuffer - // CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched. - // Lazy condition pairs should be checked before transaction commit. - CheckLazyConditionPairs() error + // Returns related condition pair + LookupConditionPair(k Key) *conditionPair // WalkBuffer iterates all buffered kv pairs. WalkBuffer(f func(k Key, v []byte) error) error // SetOption sets an option with a value, when val is nil, uses the default @@ -56,6 +53,14 @@ type conditionPair struct { err error } +func (c *conditionPair) ShouldNotExist() bool { + return len(c.value) == 0 +} + +func (c *conditionPair) Err() error { + return c.err +} + // unionStore is an in-memory Store which contains a buffer for write and a // snapshot for read. type unionStore struct { @@ -201,30 +206,9 @@ func (us *unionStore) markLazyConditionPair(k Key, v []byte, e error) { } } -// CheckLazyConditionPairs implements the UnionStore interface. -func (us *unionStore) CheckLazyConditionPairs() error { - if len(us.lazyConditionPairs) == 0 { - return nil - } - keys := make([]Key, 0, len(us.lazyConditionPairs)) - for _, v := range us.lazyConditionPairs { - keys = append(keys, v.key) - } - values, err := us.snapshot.BatchGet(keys) - if err != nil { - return errors.Trace(err) - } - - for k, v := range us.lazyConditionPairs { - if len(v.value) == 0 { - if _, exist := values[k]; exist { - return errors.Trace(v.err) - } - } else { - if !bytes.Equal(values[k], v.value) { - return errors.Trace(ErrLazyConditionPairsNotMatch) - } - } +func (us *unionStore) LookupConditionPair(k Key) *conditionPair { + if c, ok := us.lazyConditionPairs[string(k)]; ok { + return c } return nil } diff --git a/kv/union_store_test.go b/kv/union_store_test.go index 3297b14f969bc..e23d98776c8ea 100644 --- a/kv/union_store_test.go +++ b/kv/union_store_test.go @@ -121,8 +121,15 @@ func (s *testUnionStoreSuite) TestLazyConditionCheck(c *C) { _, err = s.us.Get([]byte("2")) c.Assert(terror.ErrorEqual(err, ErrNotExist), IsTrue, Commentf("err %v", err)) - err = s.us.CheckLazyConditionPairs() - c.Assert(err, NotNil) + condionPair1 := s.us.LookupConditionPair([]byte("1")) + c.Assert(condionPair1, IsNil) + + condionPair2 := s.us.LookupConditionPair([]byte("2")) + c.Assert(condionPair2, NotNil) + c.Assert(condionPair2.ShouldNotExist(), IsTrue) + + err2 := s.us.LookupConditionPair([]byte("2")).Err() + c.Assert(terror.ErrorEqual(err2, ErrNotExist), IsTrue, Commentf("err %v", err2)) } func checkIterator(c *C, iter Iterator, keys [][]byte, values [][]byte) { diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 4a674b52ccf33..584d581fd50b6 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -29,6 +29,16 @@ func (e *ErrLocked) Error() string { return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS) } +// ErrKeyAlreadyExist is returned when key exists but this key has a constraint that +// it should not exist. Client should return duplicated entry error. +type ErrKeyAlreadyExist struct { + Key []byte +} + +func (e *ErrKeyAlreadyExist) Error() string { + return fmt.Sprintf("key already exist, key: %q", e.Key) +} + // ErrRetryable suggests that client may restart the txn. e.g. write conflict. type ErrRetryable string diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 85739251aa608..920852cb16dc8 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -505,7 +505,25 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, batch := &leveldb.Batch{} errs := make([]error, 0, len(mutations)) for _, m := range mutations { - err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl) + // If the operation is Insert, check if key is exists at first. + var err error + if m.GetOp() == kvrpcpb.Op_Insert { + v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI) + if err != nil { + errs = append(errs, err) + anyError = true + continue + } + if v != nil { + err = &ErrKeyAlreadyExist{ + Key: m.Key, + } + errs = append(errs, err) + anyError = true + continue + } + } + err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl) errs = append(errs, err) if err != nil { anyError = true @@ -554,11 +572,15 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu return ErrRetryable("write conflict") } + op := mutation.GetOp() + if op == kvrpcpb.Op_Insert { + op = kvrpcpb.Op_Put + } lock := mvccLock{ startTS: startTS, primary: primary, value: mutation.Value, - op: mutation.GetOp(), + op: op, ttl: ttl, } writeKey := mvccEncode(mutation.Key, lockVer) diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 14b314f200aef..8fe156c56ffa6 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -56,6 +56,13 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { }, } } + if alreadyExist, ok := errors.Cause(err).(*ErrKeyAlreadyExist); ok { + return &kvrpcpb.KeyError{ + AlreadyExist: &kvrpcpb.AlreadyExist{ + Key: alreadyExist.Key, + }, + } + } if retryable, ok := errors.Cause(err).(ErrRetryable); ok { return &kvrpcpb.KeyError{ Retryable: retryable.Error(), diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 063cfcc0b0d05..09376b26266f4 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -16,6 +16,7 @@ package tikv import ( "bytes" "context" + "fmt" "math" "sync" "sync/atomic" @@ -97,8 +98,12 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro mutations := make(map[string]*pb.Mutation) err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error { if len(v) > 0 { + op := pb.Op_Put + if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() { + op = pb.Op_Insert + } mutations[string(k)] = &pb.Mutation{ - Op: pb.Op_Put, + Op: op, Key: k, Value: v, } @@ -381,6 +386,18 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } var locks []*Lock for _, keyErr := range keyErrs { + // Check already exists error + if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { + key := alreadyExist.GetKey() + conditionPair := c.txn.us.LookupConditionPair(key) + if conditionPair == nil { + panic(fmt.Sprintf("con:%d, conditionPair for key:%s should not be nil", c.connID, key)) + } + log.Debugf("con:%d key: %s already exists", c.connID, key) + return errors.Trace(conditionPair.Err()) + } + + // Extract lock from key error lock, err1 := extractLockFromKeyErr(keyErr) if err1 != nil { return errors.Trace(err1) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 33222f0778807..e0a04ccdcafe5 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -180,10 +180,6 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { start := time.Now() defer func() { metrics.TiKVTxnCmdHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) }() - if err := txn.us.CheckLazyConditionPairs(); err != nil { - return errors.Trace(err) - } - // connID is used for log. var connID uint64 val := ctx.Value(sessionctx.ConnID)