diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 460e39429425d..94d43c32c3041 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -32,6 +32,7 @@ type tikvTxn struct { // NewTiKVTxn returns a new Transaction. func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction { + txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{}) return &tikvTxn{txn, make(map[int64]*model.TableInfo)} } @@ -141,3 +142,11 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error { } return extractKeyExistsErrFromIndex(key, value, tblInfo, indexID) } + +// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed. +type TiDBKVFilter struct{} + +// IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed. +func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { + return tablecodec.IsUntouchedIndexKValue(key, value) +} diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 57bf74ba91522..973d9967471b5 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -314,6 +314,12 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *kv.ErrKeyExist) error { return errors.Trace(err) } +// KVFilter is a filter that filters out unnecessary KV pairs. +type KVFilter interface { + // IsUnnecessaryKeyValue returns whether this KV pair should be committed. + IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) bool +} + func (c *twoPhaseCommitter) initKeysAndMutations() error { var size, putCnt, delCnt, lockCnt, checkCnt int @@ -322,6 +328,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { sizeHint := txn.us.GetMemBuffer().Len() c.mutations = newMemBufferMutations(sizeHint, memBuf) c.isPessimistic = txn.IsPessimistic() + filter := txn.getKVFilter() var err error for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { @@ -340,10 +347,14 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { } else { value = it.Value() if len(value) > 0 { - if tablecodec.IsUntouchedIndexKValue(key, value) { + isUnnecessaryKV := filter != nil && filter.IsUnnecessaryKeyValue(key, value, flags) + if isUnnecessaryKV { if !flags.HasLocked() { continue } + // If the key was locked before, we should prewrite the lock even if + // the KV needn't be committed according to the filter. Otherwise, we + // were forgetting removing pessimistic locks added before. op = pb.Op_Lock lockCnt++ } else { diff --git a/store/tikv/kv/option.go b/store/tikv/kv/option.go index a87ddcb7214a4..e9479cf609502 100644 --- a/store/tikv/kv/option.go +++ b/store/tikv/kv/option.go @@ -59,6 +59,8 @@ const ( IsStalenessReadOnly // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels + // KVFilter filters out the key-value pairs in the memBuf that is unnecessary to be committed + KVFilter ) // Priority value for transaction priority. diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index ac323f45c59ed..95a8e9ac077ea 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" tidbkv "github.com/pingcap/tidb/kv" + drivertxn "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -1032,6 +1033,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) { noValueIndexKey := []byte("t00000001_i000000002") c.Assert(tablecodec.IsUntouchedIndexKValue(untouchedIndexKey, untouchedIndexValue), IsTrue) txn := s.begin(c) + txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{}) err := txn.Set(untouchedIndexKey, untouchedIndexValue) c.Assert(err, IsNil) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 70811607cef66..8b36d431a7ae0 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -214,6 +214,13 @@ func (txn *KVTxn) IsPessimistic() bool { return txn.us.GetOption(kv.Pessimistic) != nil } +func (txn *KVTxn) getKVFilter() KVFilter { + if filter := txn.us.GetOption(kv.KVFilter); filter != nil { + return filter.(KVFilter) + } + return nil +} + // Commit commits the transaction operations to KV store. func (txn *KVTxn) Commit(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {