Skip to content

Commit

Permalink
plan,table : support write operator for cache table and mock lockwrit…
Browse files Browse the repository at this point in the history
…e state (#29444)
  • Loading branch information
JayLZhou authored Nov 8, 2021
1 parent e98d587 commit 9f68c8e
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 40 deletions.
2 changes: 2 additions & 0 deletions ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) {
c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
/* Test can skip schema checker */
tk.MustExec("begin")
tk.MustExec("drop table if exists t1")
tk.MustExec("CREATE TABLE t1 (a int)")
tk.MustExec("insert into t1 set a=2;")
tk2.MustExec("alter table t2 cache")
tk.MustExec("commit")
Expand Down
1 change: 1 addition & 0 deletions executor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 0 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4151,7 +4151,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
return nil, err
}
// Use the txn of the transaction to determine whether the cache can be read.
// About read lock and read condition feature. will add in the next pr.
buffer, cond := cachedTable.TryGetMemcache(txn.StartTS())
if cond {
b.ctx.GetSessionVars().StmtCtx.StoreCacheTable(tbl.Meta().ID, buffer)
Expand Down
6 changes: 6 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,9 @@ type CachedTable interface {
// you need to update the lock information and read the data from the original table
UpdateLockForRead(ctx sessionctx.Context, ts uint64) error
}

// CacheData pack the cache data and lease
type CacheData struct {
Lease uint64
kv.MemBuffer
}
144 changes: 111 additions & 33 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package tables

import (
"fmt"
"sync"
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
)

Expand All @@ -30,74 +34,148 @@ var _ table.CachedTable = &cachedTable{}

type cachedTable struct {
TableCommon
kv.MemBuffer
mu sync.RWMutex
cacheData atomic.Value
handle StateRemote
}

func leaseFromTS(ts uint64) uint64 {
// TODO make this configurable in the following PRs
const defaultLeaseDuration time.Duration = 3 * time.Second
physicalTime := oracle.GetTimeFromTS(ts)
lease := oracle.GoTimeToTS(physicalTime.Add(defaultLeaseDuration))
return lease
}

func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.isReadFromCache(ts) {
return c.MemBuffer, true
tmp := c.cacheData.Load()
if tmp == nil {
return nil, false
}
data := tmp.(*table.CacheData)
if data.Lease > ts {
return data.MemBuffer, true
}
return nil, false
}
func (c *cachedTable) isReadFromCache(ts uint64) bool {
// If first read cache table. directly return false, the backend goroutine will help us update the lock information
// and read the data from the original table at the same time
// TODO : Use lease and ts judge whether it is readable.
// TODO : If the cache is not readable. MemBuffer become invalid.
return c.MemBuffer != nil
}

var mockStateRemote = struct {
Ch chan remoteTask
Data *mockStateRemoteData
}{}

// NewCachedTable creates a new CachedTable Instance
func NewCachedTable(tbl *TableCommon) (table.Table, error) {
return &cachedTable{
if mockStateRemote.Data == nil {
mockStateRemote.Data = newMockStateRemoteData()
mockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch)
}
ret := &cachedTable{
TableCommon: *tbl,
}, nil
handle: &mockStateRemoteHandle{mockStateRemote.Ch},
}

return ret, nil
}

func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context) error {
func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context, lease uint64) (kv.MemBuffer, error) {
prefix := tablecodec.GenTablePrefix(c.tableID)
txn, err := ctx.Txn(true)
if err != nil {
return err
return nil, err
}
if txn.StartTS() >= lease {
return nil, errors.New("the loaded data is outdate for caching")
}

buffTxn, err := ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0))
if err != nil {
return err
return nil, err
}

buffer := buffTxn.GetMemBuffer()
it, err := txn.Iter(prefix, prefix.PrefixNext())
if err != nil {
return err
return nil, err
}
defer it.Close()
if !it.Valid() {
return nil
}
for it.Valid() && it.Key().HasPrefix(prefix) {
value := it.Value()
err = buffer.Set(it.Key(), value)
if err != nil {
return err
return nil, err
}
err = it.Next()
if err != nil {
return err
return nil, err
}
}
c.mu.Lock()
c.MemBuffer = buffer
c.mu.Unlock()
return nil

return buffer, nil
}

func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error {
// Now only the data is re-load here, and the lock information is not updated. any read-lock information update will in the next pr.
err := c.loadDataFromOriginalTable(ctx)
// Load data from original table and the update lock information.
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.LockForRead(tid, ts, lease)
if err != nil {
return fmt.Errorf("reload data error")
return errors.Trace(err)
}
if succ {
mb, err := c.loadDataFromOriginalTable(ctx, lease)
if err != nil {
return errors.Trace(err)
}

c.cacheData.Store(&table.CacheData{
Lease: lease,
MemBuffer: mb,
})
}
// Current status is not suitable to cache.
return nil
}

// AddRecord implements the AddRecord method for the table.Table interface.
func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
txn, err := ctx.Txn(true)
if err != nil {
return nil, err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return nil, errors.Trace(err)
}
return c.TableCommon.AddRecord(ctx, r, opts...)

}

// UpdateRecord implements table.Table
func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error {
txn, err := sctx.Txn(true)
if err != nil {
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched)
}

// RemoveRecord implements table.Table RemoveRecord interface.
func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error {
txn, err := ctx.Txn(true)
if err != nil {
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
return c.TableCommon.RemoveRecord(ctx, h, r)
}
46 changes: 41 additions & 5 deletions table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,47 @@ func TestCacheCondition(t *testing.T) {

// Normal query should trigger cache.
tk.MustQuery("select * from t2")
var i int
for ; i < 10; i++ {
if tk.HasPlan("select * from t2 where id>0", "UnionScan") {
return
for !tk.HasPlan("select * from t2 where id>0", "UnionScan") {
tk.MustExec("select * from t2")
}
}

func TestCacheTableBasicReadAndWrite(t *testing.T) {
t.Parallel()
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk.MustExec("drop table if exists write_tmp1")
tk.MustExec("create table write_tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into write_tmp1 values" +
"(1, 101, 1001), (3, 113, 1003)",
)

tk.MustExec("alter table write_tmp1 cache")
// Read and add read lock
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001",
"3 113 1003"))
// read lock should valid
for i := 0; i < 10; i++ {
if tk.HasPlan("select *from write_tmp1", "UnionScan") {
break
}
}
require.True(t, i < 10)
tk.MustExec("use test")
tk1.MustExec("insert into write_tmp1 values (2, 222, 222)")
// write lock exists
require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan"))
// wait write lock expire and check cache can be used again
for !tk.HasPlan("select *from write_tmp1", "UnionScan") {
tk.MustExec("select *from write_tmp1")
}
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003"))
tk1.MustExec("update write_tmp1 set v = 3333 where id = 2")
for !tk.HasPlan("select *from write_tmp1", "UnionScan") {
tk.MustExec("select *from write_tmp1")
}
tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003"))
}
2 changes: 1 addition & 1 deletion table/tables/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

func TestMain(m *testing.M) {
testbridge.WorkaroundGoCheckFlags()

opts := []goleak.Option{
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading

0 comments on commit 9f68c8e

Please sign in to comment.