Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dml : cache table reads data from the original table in a new transaction #29575

Merged
merged 4 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4164,7 +4164,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
}
}()
if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt {
err := cachedTable.UpdateLockForRead(b.ctx, txn.StartTS())
err := cachedTable.UpdateLockForRead(b.ctx.GetStore(), txn.StartTS())
if err != nil {
log.Warn("Update Lock Info Error")
}
Expand Down
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ type CachedTable interface {

// UpdateLockForRead If you cannot meet the conditions of the read buffer,
// you need to update the lock information and read the data from the original table
UpdateLockForRead(ctx sessionctx.Context, ts uint64) error
UpdateLockForRead(store kv.Storage, ts uint64) error
}

// CacheData pack the cache data and lease
Expand Down
64 changes: 38 additions & 26 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func leaseFromTS(ts uint64) uint64 {
return lease
}

func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) {
// Here is a trick to get a MemBuffer data, because the internal API is not exposed.
// Create a transaction with start ts 0, and take the MemBuffer out.
buffTxn, err := store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0))
if err != nil {
return nil, err
}
return buffTxn.GetMemBuffer(), nil
}

func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) {
tmp := c.cacheData.Load()
if tmp == nil {
Expand Down Expand Up @@ -78,43 +88,45 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) {
return ret, nil
}

func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context, lease uint64) (kv.MemBuffer, error) {
prefix := tablecodec.GenTablePrefix(c.tableID)
txn, err := ctx.Txn(true)
func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, error) {
buffer, err := newMemBuffer(store)
if err != nil {
return nil, err
}
if txn.StartTS() >= lease {
return nil, errors.New("the loaded data is outdate for caching")
}

buffTxn, err := ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0))
if err != nil {
return nil, err
}

buffer := buffTxn.GetMemBuffer()
it, err := txn.Iter(prefix, prefix.PrefixNext())
if err != nil {
return nil, err
}
defer it.Close()
for it.Valid() && it.Key().HasPrefix(prefix) {
value := it.Value()
err = buffer.Set(it.Key(), value)
err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
prefix := tablecodec.GenTablePrefix(c.tableID)
if err != nil {
return nil, err
return err
}
err = it.Next()
if txn.StartTS() >= lease {
return errors.New("the loaded data is outdated for caching")
}
it, err := txn.Iter(prefix, prefix.PrefixNext())
if err != nil {
return nil, err
return err
}
defer it.Close()
for it.Valid() && it.Key().HasPrefix(prefix) {
value := it.Value()
err = buffer.Set(it.Key(), value)
if err != nil {
return err
}
err = it.Next()
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}

return buffer, nil
}

func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error {
func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error {
// Load data from original table and the update lock information.
tid := c.Meta().ID
lease := leaseFromTS(ts)
Expand All @@ -123,7 +135,7 @@ func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error
return errors.Trace(err)
}
if succ {
mb, err := c.loadDataFromOriginalTable(ctx, lease)
mb, err := c.loadDataFromOriginalTable(store, lease)
if err != nil {
return errors.Trace(err)
}
Expand Down
27 changes: 27 additions & 0 deletions table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,30 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) {
}
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003"))
}

func TestCacheTableComplexRead(t *testing.T) {
t.Parallel()
store, clean := testkit.CreateMockStore(t)
defer clean()
doneCh := make(chan struct{}, 1)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)")
tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)")
tk1.MustExec("alter table complex_cache cache")
tk1.MustExec("begin")
tk1.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))

go func() {
tk2.MustExec("begin")
tk2.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009"))
tk2.HasPlan("select *from complex_cache where id > 7", "UnionScan")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading cache is async, will it make the test unstable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right.

tk2.MustExec("commit")
doneCh <- struct{}{}
}()
<-doneCh
tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan")
tk1.MustExec("commit")
}