Skip to content

Commit

Permalink
kv: remove UnionStore interface (#24625)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored May 13, 2021
1 parent 5d40ea4 commit b7c22aa
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 124 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
// we fetch records row by row, so records will belong to
// index[0], index[1] ... index[n-1], index[0], index[1] ...
// respectively. So indexes[i%n] is the index of idxRecords[i].
err := w.indexes[i%n].Delete(w.sessCtx.GetSessionVars().StmtCtx, txn.GetUnionStore(), idxRecord.vals, idxRecord.handle)
err := w.indexes[i%n].Delete(w.sessCtx.GetSessionVars().StmtCtx, txn, idxRecord.vals, idxRecord.handle)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func checkIndexExists(ctx sessionctx.Context, tbl table.Table, indexValue interf
if err != nil {
return errors.Trace(err)
}
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, txn.GetUnionStore(), types.MakeDatums(indexValue), kv.IntHandle(handle))
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(indexValue), kv.IntHandle(handle))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri
return errors.Trace(errors.Errorf("batch keys are inconsistent with handles"))
}
for _, handleIdxVals := range handleIdxValsGroup.([][]types.Datum) {
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn.GetUnionStore(), handleIdxVals, handle); err != nil {
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, handleIdxVals, handle); err != nil {
return err
}
e.removeCnt++
Expand Down
44 changes: 22 additions & 22 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *testSuite5) TestAdminRecoverIndex(c *C) {
sc := s.ctx.GetSessionVars().StmtCtx
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(1), kv.IntHandle(1))
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), kv.IntHandle(1))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -158,7 +158,7 @@ func (s *testSuite5) TestAdminRecoverIndex(c *C) {

txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(10), kv.IntHandle(10))
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), kv.IntHandle(10))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -172,15 +172,15 @@ func (s *testSuite5) TestAdminRecoverIndex(c *C) {

txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(1), kv.IntHandle(1))
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), kv.IntHandle(1))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(2), kv.IntHandle(2))
err = indexOpr.Delete(sc, txn, types.MakeDatums(2), kv.IntHandle(2))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(3), kv.IntHandle(3))
err = indexOpr.Delete(sc, txn, types.MakeDatums(3), kv.IntHandle(3))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(10), kv.IntHandle(10))
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), kv.IntHandle(10))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(20), kv.IntHandle(20))
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), kv.IntHandle(20))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *testSuite5) TestClusteredIndexAdminRecoverIndex(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
cHandle := testutil.MustNewCommonHandle(c, "1", "3")
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(2), cHandle)
err = indexOpr.Delete(sc, txn, types.MakeDatums(2), cHandle)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (s *testSuite5) TestAdminRecoverPartitionTableIndex(c *C) {
sc := s.ctx.GetSessionVars().StmtCtx
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(idxValue), kv.IntHandle(idxValue))
err = indexOpr.Delete(sc, txn, types.MakeDatums(idxValue), kv.IntHandle(idxValue))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -345,13 +345,13 @@ func (s *testSuite5) TestAdminRecoverIndex1(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums("1"), kv.IntHandle(1))
err = indexOpr.Delete(sc, txn, types.MakeDatums("1"), kv.IntHandle(1))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums("2"), kv.IntHandle(2))
err = indexOpr.Delete(sc, txn, types.MakeDatums("2"), kv.IntHandle(2))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums("3"), kv.IntHandle(3))
err = indexOpr.Delete(sc, txn, types.MakeDatums("3"), kv.IntHandle(3))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums("10"), kv.IntHandle(4))
err = indexOpr.Delete(sc, txn, types.MakeDatums("10"), kv.IntHandle(4))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -745,7 +745,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(i), kv.IntHandle(i))
err = indexOpr.Delete(sc, txn, types.MakeDatums(i), kv.IntHandle(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -784,7 +784,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(i+8), kv.IntHandle(i+8))
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), kv.IntHandle(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -807,7 +807,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(i+8), kv.IntHandle(i))
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), kv.IntHandle(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -842,7 +842,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Index c2 is missing 11.
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(-10), kv.IntHandle(-1))
err = indexOpr.Delete(sc, txn, types.MakeDatums(-10), kv.IntHandle(-1))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -873,7 +873,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Index c2 has two more values than table data: 10, 13, and these handles have correlative record.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(0), kv.IntHandle(0))
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0))
c.Assert(err, IsNil)
// Make sure the index value "19" is smaller "21". Then we scan to "19" before "21".
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(19), kv.IntHandle(10), nil)
Expand All @@ -890,9 +890,9 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Two indices have the same handle.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(13), kv.IntHandle(2))
err = indexOpr.Delete(sc, txn, types.MakeDatums(13), kv.IntHandle(2))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(12), kv.IntHandle(2))
err = indexOpr.Delete(sc, txn, types.MakeDatums(12), kv.IntHandle(2))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -906,7 +906,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(12), kv.IntHandle(2), nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(20), kv.IntHandle(10))
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), kv.IntHandle(10))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -917,7 +917,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Recover records.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn.GetUnionStore(), types.MakeDatums(19), kv.IntHandle(10))
err = indexOpr.Delete(sc, txn, types.MakeDatums(19), kv.IntHandle(10))
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(20), kv.IntHandle(10), nil)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *testSuite3) TestInconsistentIndex(c *C) {
for i := 0; i < 10; i++ {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = idxOp.Delete(ctx.GetSessionVars().StmtCtx, txn.GetUnionStore(), types.MakeDatums(i+10), kv.IntHandle(100+i))
err = idxOp.Delete(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(i+10), kv.IntHandle(100+i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3852,9 +3852,9 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = idx.Delete(sc, txn.GetUnionStore(), types.MakeDatums(int64(30)), kv.IntHandle(3))
err = idx.Delete(sc, txn, types.MakeDatums(int64(30)), kv.IntHandle(3))
c.Assert(err, IsNil)
err = idx.Delete(sc, txn.GetUnionStore(), types.MakeDatums(int64(20)), kv.IntHandle(2))
err = idx.Delete(sc, txn, types.MakeDatums(int64(20)), kv.IntHandle(2))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
4 changes: 0 additions & 4 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ func (t *mockTxn) GetSnapshot() Snapshot {
return nil
}

func (t *mockTxn) GetUnionStore() UnionStore {
return nil
}

func (t *mockTxn) NewStagingBuffer() MemBuffer {
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ type Transaction interface {
GetMemBuffer() MemBuffer
// GetSnapshot returns the Snapshot binding to this transaction.
GetSnapshot() Snapshot
// GetUnionStore returns the UnionStore binding to this transaction.
GetUnionStore() UnionStore
// SetVars sets variables to the transaction.
SetVars(vars interface{})
// GetVars gets variables from the transaction.
Expand Down
17 changes: 1 addition & 16 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,6 @@ package kv

// UnionStore is a store that wraps a snapshot for read and a MemBuffer for buffered write.
// Also, it provides some transaction related utilities.
// TODO: Remove after upgrading BR.
type UnionStore interface {
Retriever

// HasPresumeKeyNotExists returns whether the key presumed key not exists error for the lazy check.
HasPresumeKeyNotExists(k Key) bool
// UnmarkPresumeKeyNotExists deletes the key presume key not exists error flag for the lazy check.
UnmarkPresumeKeyNotExists(k Key)

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt int, val interface{})
// DelOption deletes an option.
DelOption(opt int)
// GetOption gets an option.
GetOption(opt int) interface{}
// GetMemBuffer return the MemBuffer binding to this unionStore.
GetMemBuffer() MemBuffer
}
4 changes: 0 additions & 4 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
return newMemBuffer(txn.KVTxn.GetMemBuffer())
}

func (txn *tikvTxn) GetUnionStore() kv.UnionStore {
return &tikvUnionStore{txn.KVTxn.GetUnionStore()}
}

func (txn *tikvTxn) SetOption(opt int, val interface{}) {
switch opt {
case kv.BinlogInfo:
Expand Down
36 changes: 0 additions & 36 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,42 +111,6 @@ func (m *memBuffer) SnapshotGetter() kv.Getter {
return newKVGetter(m.MemDB.SnapshotGetter())
}

//tikvUnionStore implements kv.UnionStore
type tikvUnionStore struct {
*unionstore.KVUnionStore
}

func (u *tikvUnionStore) GetMemBuffer() kv.MemBuffer {
return newMemBuffer(u.KVUnionStore.GetMemBuffer())
}

func (u *tikvUnionStore) Get(ctx context.Context, k kv.Key) ([]byte, error) {
data, err := u.KVUnionStore.Get(ctx, k)
return data, derr.ToTiDBErr(err)
}

func (u *tikvUnionStore) HasPresumeKeyNotExists(k kv.Key) bool {
return u.KVUnionStore.HasPresumeKeyNotExists(k)
}

func (u *tikvUnionStore) UnmarkPresumeKeyNotExists(k kv.Key) {
u.KVUnionStore.UnmarkPresumeKeyNotExists(k)
}

func (u *tikvUnionStore) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
it, err := u.KVUnionStore.Iter(k, upperBound)
return newKVIterator(it), derr.ToTiDBErr(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
func (u *tikvUnionStore) IterReverse(k kv.Key) (kv.Iterator, error) {
it, err := u.KVUnionStore.IterReverse(k)
return newKVIterator(it), derr.ToTiDBErr(err)
}

type tikvGetter struct {
unionstore.Getter
}
Expand Down
6 changes: 3 additions & 3 deletions table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ type Index interface {
// Create supports insert into statement.
Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, us kv.UnionStore, indexedValues []types.Datum, h kv.Handle) error
Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// Drop supports drop table, drop index statements.
Drop(us kv.UnionStore) error
Drop(txn kv.Transaction) error
// Exist supports check index exists or not.
Exist(sc *stmtctx.StatementContext, us kv.UnionStore, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
// GenIndexKey generates an index key.
GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error)
// Seek supports where clause.
Expand Down
27 changes: 13 additions & 14 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
return nil, err
}

us := txn.GetUnionStore()
if !distinct || skipCheck || opt.Untouched {
err = us.GetMemBuffer().Set(key, idxVal)
err = txn.GetMemBuffer().Set(key, idxVal)
return nil, err
}

Expand All @@ -202,18 +201,18 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue

var value []byte
if sctx.GetSessionVars().LazyCheckKeyNotExists() {
value, err = us.GetMemBuffer().Get(ctx, key)
value, err = txn.GetMemBuffer().Get(ctx, key)
} else {
value, err = us.Get(ctx, key)
value, err = txn.Get(ctx, key)
}
if err != nil && !kv.IsErrNotFound(err) {
return nil, err
}
if err != nil || len(value) == 0 {
if sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil {
err = us.GetMemBuffer().SetWithFlags(key, idxVal, tikvstore.SetPresumeKeyNotExists)
err = txn.GetMemBuffer().SetWithFlags(key, idxVal, tikvstore.SetPresumeKeyNotExists)
} else {
err = us.GetMemBuffer().Set(key, idxVal)
err = txn.GetMemBuffer().Set(key, idxVal)
}
return nil, err
}
Expand All @@ -226,22 +225,22 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}

// Delete removes the entry for handle h and indexedValues from KV index.
func (c *index) Delete(sc *stmtctx.StatementContext, us kv.UnionStore, indexedValues []types.Datum, h kv.Handle) error {
func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return err
}
if distinct {
err = us.GetMemBuffer().DeleteWithFlags(key, tikvstore.SetNeedLocked)
err = txn.GetMemBuffer().DeleteWithFlags(key, tikvstore.SetNeedLocked)
} else {
err = us.GetMemBuffer().Delete(key)
err = txn.GetMemBuffer().Delete(key)
}
return err
}

// Drop removes the KV index from store.
func (c *index) Drop(us kv.UnionStore) error {
it, err := us.Iter(c.prefix, c.prefix.PrefixNext())
func (c *index) Drop(txn kv.Transaction) error {
it, err := txn.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return err
}
Expand All @@ -252,7 +251,7 @@ func (c *index) Drop(us kv.UnionStore) error {
if !it.Key().HasPrefix(c.prefix) {
break
}
err := us.GetMemBuffer().Delete(it.Key())
err := txn.GetMemBuffer().Delete(it.Key())
if err != nil {
return err
}
Expand Down Expand Up @@ -298,13 +297,13 @@ func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error)
return &indexIter{it: it, idx: c, prefix: c.prefix, colInfos: colInfos, tps: tps}, nil
}

func (c *index) Exist(sc *stmtctx.StatementContext, us kv.UnionStore, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return false, nil, err
}

value, err := us.Get(context.TODO(), key)
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
Expand Down
Loading

0 comments on commit b7c22aa

Please sign in to comment.