Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: define KVFilter for customized mutation initialization #24021

Merged
merged 16 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type tikvTxn struct {

// NewTiKVTxn returns a new Transaction.
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{})
return &tikvTxn{txn, make(map[int64]*model.TableInfo)}
}

Expand Down Expand Up @@ -141,3 +142,11 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error {
}
return extractKeyExistsErrFromIndex(key, value, tblInfo, indexID)
}

// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed.
type TiDBKVFilter struct{}

// IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed.
func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool {
return tablecodec.IsUntouchedIndexKValue(key, value)
}
13 changes: 12 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *kv.ErrKeyExist) error {
return errors.Trace(err)
}

// KVFilter is a filter that filters out unnecessary KV pairs.
type KVFilter interface {
// IsUnnecessaryKeyValue returns whether this KV pair should be committed.
IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) bool
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var size, putCnt, delCnt, lockCnt, checkCnt int

Expand All @@ -322,6 +328,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
sizeHint := txn.us.GetMemBuffer().Len()
c.mutations = newMemBufferMutations(sizeHint, memBuf)
c.isPessimistic = txn.IsPessimistic()
filter := txn.getKVFilter()

var err error
for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
Expand All @@ -340,10 +347,14 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
} else {
value = it.Value()
if len(value) > 0 {
if tablecodec.IsUntouchedIndexKValue(key, value) {
isUnnecessaryKV := filter != nil && filter.IsUnnecessaryKeyValue(key, value, flags)
if isUnnecessaryKV {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only un-empty value need the filter, so could we move the filter logic in L350~L351?

if !flags.HasLocked() {
continue
}
// If the key was locked before, we should prewrite the lock even if
// the KV needn't be committed according to the filter. Otherwise, we
// were forgetting removing pessimistic locks added before.
op = pb.Op_Lock
lockCnt++
} else {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
// KVFilter filters out the key-value pairs in the memBuf that is unnecessary to be committed
KVFilter
)

// Priority value for transaction priority.
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
drivertxn "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -1032,6 +1033,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) {
noValueIndexKey := []byte("t00000001_i000000002")
c.Assert(tablecodec.IsUntouchedIndexKValue(untouchedIndexKey, untouchedIndexValue), IsTrue)
txn := s.begin(c)
txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{})
err := txn.Set(untouchedIndexKey, untouchedIndexValue)
c.Assert(err, IsNil)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ func (txn *KVTxn) IsPessimistic() bool {
return txn.us.GetOption(kv.Pessimistic) != nil
}

func (txn *KVTxn) getKVFilter() KVFilter {
if filter := txn.us.GetOption(kv.KVFilter); filter != nil {
return filter.(KVFilter)
}
return nil
}

// Commit commits the transaction operations to KV store.
func (txn *KVTxn) Commit(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down