Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down condition check to TiKV for prewrite request #9127

Merged
merged 35 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e1cf7c3
push down exist check
zhangjinpeng87 Jan 18, 2019
f2d57cc
push down precondition for prewrite
zhangjinpeng87 Jan 21, 2019
d0d96dc
handle already exist error
zhangjinpeng87 Jan 21, 2019
b8ae2ea
push down precondition for prewrite
zhangjinpeng87 Jan 21, 2019
d2a9fa8
merge master
zhangjinpeng87 Jan 21, 2019
b3169da
use go-1.11.3
zhangjinpeng87 Jan 21, 2019
775bf27
update go.sum
zhangjinpeng87 Jan 21, 2019
0cbd678
update kvproto
zhangjinpeng87 Jan 23, 2019
ec363ef
update kvproto
zhangjinpeng87 Feb 14, 2019
9527f70
merge master
zhangjinpeng87 Feb 14, 2019
043d3bb
rename GetPreconditionErr to LookupConditionErr
zhangjinpeng87 Feb 14, 2019
0eb5a03
Merge remote-tracking branch 'pingcap-tidb/master' into prewrite-not-…
zhangjinpeng87 Feb 14, 2019
e0c9740
update kvproto
zhangjinpeng87 Feb 14, 2019
1094455
update go.sum
zhangjinpeng87 Feb 14, 2019
db8501d
rm parser.go
zhangjinpeng87 Feb 14, 2019
66ad56e
update go.sum
zhangjinpeng87 Feb 14, 2019
197c3b0
update go.sum
zhangjinpeng87 Feb 14, 2019
d103cf8
format code
zhangjinpeng87 Feb 14, 2019
de56fc6
fix test
zhangjinpeng87 Feb 14, 2019
6254259
update go.sum
zhangjinpeng87 Feb 14, 2019
a54be30
handle insert opration in mvcc_leveldb
zhangjinpeng87 Feb 15, 2019
79cbc39
fix test
zhangjinpeng87 Feb 15, 2019
a983394
address comments
zhangjinpeng87 Feb 15, 2019
2f6cc45
update go.sum
zhangjinpeng87 Feb 15, 2019
c25f75e
address comments
zhangjinpeng87 Feb 16, 2019
21deaad
udpate kvproto
zhangjinpeng87 Feb 16, 2019
5ff88e5
merge master
zhangjinpeng87 Feb 19, 2019
2d4b688
update kvproto
zhangjinpeng87 Feb 19, 2019
7148ea3
Merge branch 'master' into prewrite-not-exist
zhangjinpeng87 Feb 19, 2019
08bcdd3
address comments
zhangjinpeng87 Feb 19, 2019
e9e9616
Merge branch 'prewrite-not-exist' of github.com:zhangjinpeng1987/tidb…
zhangjinpeng87 Feb 19, 2019
33da4fc
address comments
zhangjinpeng87 Feb 19, 2019
ace4c40
Merge branch 'master' into prewrite-not-exist
zhangjinpeng87 Feb 19, 2019
173fa7e
Merge branch 'master' into prewrite-not-exist
zhangjinpeng87 Feb 19, 2019
52c900a
Merge branch 'master' into prewrite-not-exist
tiancaiamao Feb 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/pingcap/errors v0.11.0
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32
github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97
github.com/pingcap/pd v2.1.0-rc.4+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32 h1:9uwqk2DvsAKImRKYAjERMuIf5ZiCcNFhaFhgFRXw7X0=
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562 h1:32oF1/8lVnBR2JVcCAnKPQATTOX0+ckRDFpjQk4Ngno=
github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97 h1:GIhPQAwFwnf6cSdVYXdSNkx171Nl9ZmIVYrOtN3I2lw=
Expand Down
42 changes: 13 additions & 29 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@
package kv

import (
"bytes"

"github.com/pingcap/errors"
)

// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write.
// Also, it provides some transaction related utilities.
type UnionStore interface {
MemBuffer
// CheckLazyConditionPairs loads all lazy values from store then checks if all values are matched.
// Lazy condition pairs should be checked before transaction commit.
CheckLazyConditionPairs() error
// Returns related condition pair
LookupConditionPair(k Key) *conditionPair
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
Expand Down Expand Up @@ -56,6 +53,14 @@ type conditionPair struct {
err error
}

func (c *conditionPair) ShouldNotExist() bool {
return len(c.value) == 0
}

func (c *conditionPair) Err() error {
return c.err
}

// unionStore is an in-memory Store which contains a buffer for write and a
// snapshot for read.
type unionStore struct {
Expand Down Expand Up @@ -201,30 +206,9 @@ func (us *unionStore) markLazyConditionPair(k Key, v []byte, e error) {
}
}

// CheckLazyConditionPairs implements the UnionStore interface.
func (us *unionStore) CheckLazyConditionPairs() error {
if len(us.lazyConditionPairs) == 0 {
return nil
}
keys := make([]Key, 0, len(us.lazyConditionPairs))
for _, v := range us.lazyConditionPairs {
keys = append(keys, v.key)
}
values, err := us.snapshot.BatchGet(keys)
if err != nil {
return errors.Trace(err)
}

for k, v := range us.lazyConditionPairs {
if len(v.value) == 0 {
if _, exist := values[k]; exist {
return errors.Trace(v.err)
}
} else {
if !bytes.Equal(values[k], v.value) {
return errors.Trace(ErrLazyConditionPairsNotMatch)
}
}
func (us *unionStore) LookupConditionPair(k Key) *conditionPair {
disksing marked this conversation as resolved.
Show resolved Hide resolved
if c, ok := us.lazyConditionPairs[string(k)]; ok {
return c
}
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions kv/union_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,15 @@ func (s *testUnionStoreSuite) TestLazyConditionCheck(c *C) {
_, err = s.us.Get([]byte("2"))
c.Assert(terror.ErrorEqual(err, ErrNotExist), IsTrue, Commentf("err %v", err))

err = s.us.CheckLazyConditionPairs()
c.Assert(err, NotNil)
condionPair1 := s.us.LookupConditionPair([]byte("1"))
c.Assert(condionPair1, IsNil)

condionPair2 := s.us.LookupConditionPair([]byte("2"))
c.Assert(condionPair2, NotNil)
c.Assert(condionPair2.ShouldNotExist(), IsTrue)

err2 := s.us.LookupConditionPair([]byte("2")).Err()
c.Assert(terror.ErrorEqual(err2, ErrNotExist), IsTrue, Commentf("err %v", err2))
}

func checkIterator(c *C, iter Iterator, keys [][]byte, values [][]byte) {
Expand Down
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ func (e *ErrLocked) Error() string {
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
}

// ErrKeyAlreadyExist is returned when key exists but this key has a constraint that
// it should not exist. Client should return duplicated entry error.
type ErrKeyAlreadyExist struct {
Key []byte
}

func (e *ErrKeyAlreadyExist) Error() string {
return fmt.Sprintf("key already exist, key: %q", e.Key)
}

// ErrRetryable suggests that client may restart the txn. e.g. write conflict.
type ErrRetryable string

Expand Down
26 changes: 24 additions & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,25 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
batch := &leveldb.Batch{}
errs := make([]error, 0, len(mutations))
for _, m := range mutations {
err := prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl)
// If the operation is Insert, check if key is exists at first.
var err error
if m.GetOp() == kvrpcpb.Op_Insert {
v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI)
if err != nil {
errs = append(errs, err)
anyError = true
continue
}
if v != nil {
err = &ErrKeyAlreadyExist{
Key: m.Key,
}
errs = append(errs, err)
anyError = true
continue
}
}
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl)
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down Expand Up @@ -554,11 +572,15 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
return ErrRetryable("write conflict")
}

op := mutation.GetOp()
if op == kvrpcpb.Op_Insert {
op = kvrpcpb.Op_Put
}
lock := mvccLock{
startTS: startTS,
primary: primary,
value: mutation.Value,
op: mutation.GetOp(),
op: op,
ttl: ttl,
}
writeKey := mvccEncode(mutation.Key, lockVer)
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
},
}
}
if alreadyExist, ok := errors.Cause(err).(*ErrKeyAlreadyExist); ok {
return &kvrpcpb.KeyError{
AlreadyExist: &kvrpcpb.AlreadyExist{
Key: alreadyExist.Key,
},
}
}
if retryable, ok := errors.Cause(err).(ErrRetryable); ok {
return &kvrpcpb.KeyError{
Retryable: retryable.Error(),
Expand Down
19 changes: 18 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync/atomic"
"time"

"fmt"
disksing marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -97,8 +98,12 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
mutations := make(map[string]*pb.Mutation)
err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error {
if len(v) > 0 {
op := pb.Op_Put
if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() {
op = pb.Op_Insert
}
mutations[string(k)] = &pb.Mutation{
Op: pb.Op_Put,
Op: op,
Key: k,
Value: v,
}
Expand Down Expand Up @@ -381,6 +386,18 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
var locks []*Lock
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
key := alreadyExist.GetKey()
conditionPair := c.txn.us.LookupConditionPair(key)
if conditionPair == nil {
panic(fmt.Sprintf("con:%d, conditionPair for key:%s should not be nil", c.connID, key))
}
log.Errorf("con:%d key: %s already exists", c.connID, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why log error here? key already exists is a normal business logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to log duplicated entry error in TiDB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with tiancaiamao, if we log the duplicated error, there will be too many useless logs.

return errors.Trace(conditionPair.Err())
}

// Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr)
if err1 != nil {
return errors.Trace(err1)
Expand Down
4 changes: 0 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
start := time.Now()
defer func() { metrics.TiKVTxnCmdHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) }()

if err := txn.us.CheckLazyConditionPairs(); err != nil {
return errors.Trace(err)
}

// connID is used for log.
var connID uint64
val := ctx.Value(sessionctx.ConnID)
Expand Down