From f8f5eaa710c1091a7441255cdad18e7350b35a78 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 27 Apr 2021 00:27:22 +0800 Subject: [PATCH] store/tikv: remove kvfilter option Signed-off-by: disksing --- store/driver/txn/txn_driver.go | 2 +- store/tikv/2pc.go | 2 +- store/tikv/kv/option.go | 2 -- store/tikv/tests/2pc_test.go | 2 +- store/tikv/txn.go | 16 +++++++--------- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 3cf800eaaddd5..b385bd735c6bc 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -33,7 +33,7 @@ type tikvTxn struct { // NewTiKVTxn returns a new Transaction. func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction { - txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{}) + txn.SetKVFilter(TiDBKVFilter{}) entryLimit := atomic.LoadUint64(&kv.TxnEntrySizeLimit) totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index cecd0b7855c91..a35001eb93ae1 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -326,7 +326,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { sizeHint := txn.us.GetMemBuffer().Len() c.mutations = newMemBufferMutations(sizeHint, memBuf) c.isPessimistic = txn.IsPessimistic() - filter := txn.getKVFilter() + filter := txn.kvFilter var err error for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { diff --git a/store/tikv/kv/option.go b/store/tikv/kv/option.go index e9479cf609502..a87ddcb7214a4 100644 --- a/store/tikv/kv/option.go +++ b/store/tikv/kv/option.go @@ -59,8 +59,6 @@ const ( IsStalenessReadOnly // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels - // KVFilter filters out the key-value pairs in the memBuf that is unnecessary to be committed - KVFilter ) // Priority value for transaction priority. diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index b89e170392dba..8e7a35b2fd6c9 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -1031,7 +1031,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) { untouchedIndexValue := []byte{0, 0, 0, 0, 0, 0, 0, 1, 49} noValueIndexKey := []byte("t00000001_i000000002") txn := s.begin(c) - txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{}) + txn.SetKVFilter(drivertxn.TiDBKVFilter{}) err := txn.Set(untouchedIndexKey, untouchedIndexValue) c.Assert(err, IsNil) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 202e38c309f83..51a58d6cb03d8 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -71,8 +71,8 @@ type KVTxn struct { schemaAmender SchemaAmender // commitCallback is called after current transaction gets committed commitCallback func(info tidbkv.TxnInfo, err error) - - binlog BinlogExecutor + binlog BinlogExecutor + kvFilter KVFilter } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { @@ -213,18 +213,16 @@ func (txn *KVTxn) DelOption(opt int) { txn.us.DelOption(opt) } +// SetKVFilter sets the filter to ignore key-values in memory buffer. +func (txn *KVTxn) SetKVFilter(filter KVFilter) { + txn.kvFilter = filter +} + // IsPessimistic returns true if it is pessimistic. func (txn *KVTxn) IsPessimistic() bool { return txn.us.GetOption(kv.Pessimistic) != nil } -func (txn *KVTxn) getKVFilter() KVFilter { - if filter := txn.us.GetOption(kv.KVFilter); filter != nil { - return filter.(KVFilter) - } - return nil -} - // Commit commits the transaction operations to KV store. func (txn *KVTxn) Commit(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {