From 9f68c8e92a994e4790bfd9e567e5ad86c8daa861 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Mon, 8 Nov 2021 16:11:04 +0800 Subject: [PATCH] plan,table : support write operator for cache table and mock lockwrite state (#29444) --- ddl/db_cache_test.go | 2 + executor/main_test.go | 1 + planner/core/logical_plan_builder.go | 1 - table/table.go | 6 + table/tables/cache.go | 144 ++++++++++---- table/tables/cache_test.go | 46 ++++- table/tables/main_test.go | 2 +- table/tables/state_remote.go | 278 +++++++++++++++++++++++++++ 8 files changed, 440 insertions(+), 40 deletions(-) create mode 100644 table/tables/state_remote.go diff --git a/ddl/db_cache_test.go b/ddl/db_cache_test.go index d0a29b280a7ef..bc51630e926c7 100644 --- a/ddl/db_cache_test.go +++ b/ddl/db_cache_test.go @@ -48,6 +48,8 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) { c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue) /* Test can skip schema checker */ tk.MustExec("begin") + tk.MustExec("drop table if exists t1") + tk.MustExec("CREATE TABLE t1 (a int)") tk.MustExec("insert into t1 set a=2;") tk2.MustExec("alter table t2 cache") tk.MustExec("commit") diff --git a/executor/main_test.go b/executor/main_test.go index 70f715e120f2f..dd7d11b3c8274 100644 --- a/executor/main_test.go +++ b/executor/main_test.go @@ -44,6 +44,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 16045eab219db..67d856479d755 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4151,7 +4151,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as return nil, err } // Use the txn of the transaction to determine whether the cache can be read. - // About read lock and read condition feature. will add in the next pr. buffer, cond := cachedTable.TryGetMemcache(txn.StartTS()) if cond { b.ctx.GetSessionVars().StmtCtx.StoreCacheTable(tbl.Meta().ID, buffer) diff --git a/table/table.go b/table/table.go index afa2a768bf01c..136d1c926638b 100644 --- a/table/table.go +++ b/table/table.go @@ -260,3 +260,9 @@ type CachedTable interface { // you need to update the lock information and read the data from the original table UpdateLockForRead(ctx sessionctx.Context, ts uint64) error } + +// CacheData pack the cache data and lease +type CacheData struct { + Lease uint64 + kv.MemBuffer +} diff --git a/table/tables/cache.go b/table/tables/cache.go index a7986b76798bd..e9910ea232d3a 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -15,13 +15,17 @@ package tables import ( - "fmt" - "sync" + "context" + "sync/atomic" + "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" ) @@ -30,74 +34,148 @@ var _ table.CachedTable = &cachedTable{} type cachedTable struct { TableCommon - kv.MemBuffer - mu sync.RWMutex + cacheData atomic.Value + handle StateRemote +} + +func leaseFromTS(ts uint64) uint64 { + // TODO make this configurable in the following PRs + const defaultLeaseDuration time.Duration = 3 * time.Second + physicalTime := oracle.GetTimeFromTS(ts) + lease := oracle.GoTimeToTS(physicalTime.Add(defaultLeaseDuration)) + return lease } func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - if c.isReadFromCache(ts) { - return c.MemBuffer, true + tmp := c.cacheData.Load() + if tmp == nil { + return nil, false + } + data := tmp.(*table.CacheData) + if data.Lease > ts { + return data.MemBuffer, true } return nil, false } -func (c *cachedTable) isReadFromCache(ts uint64) bool { - // If first read cache table. directly return false, the backend goroutine will help us update the lock information - // and read the data from the original table at the same time - // TODO : Use lease and ts judge whether it is readable. - // TODO : If the cache is not readable. MemBuffer become invalid. - return c.MemBuffer != nil -} + +var mockStateRemote = struct { + Ch chan remoteTask + Data *mockStateRemoteData +}{} // NewCachedTable creates a new CachedTable Instance func NewCachedTable(tbl *TableCommon) (table.Table, error) { - return &cachedTable{ + if mockStateRemote.Data == nil { + mockStateRemote.Data = newMockStateRemoteData() + mockStateRemote.Ch = make(chan remoteTask, 100) + go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch) + } + ret := &cachedTable{ TableCommon: *tbl, - }, nil + handle: &mockStateRemoteHandle{mockStateRemote.Ch}, + } + + return ret, nil } -func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context) error { +func (c *cachedTable) loadDataFromOriginalTable(ctx sessionctx.Context, lease uint64) (kv.MemBuffer, error) { prefix := tablecodec.GenTablePrefix(c.tableID) txn, err := ctx.Txn(true) if err != nil { - return err + return nil, err } + if txn.StartTS() >= lease { + return nil, errors.New("the loaded data is outdate for caching") + } + buffTxn, err := ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0)) if err != nil { - return err + return nil, err } + buffer := buffTxn.GetMemBuffer() it, err := txn.Iter(prefix, prefix.PrefixNext()) if err != nil { - return err + return nil, err } defer it.Close() - if !it.Valid() { - return nil - } for it.Valid() && it.Key().HasPrefix(prefix) { value := it.Value() err = buffer.Set(it.Key(), value) if err != nil { - return err + return nil, err } err = it.Next() if err != nil { - return err + return nil, err } } - c.mu.Lock() - c.MemBuffer = buffer - c.mu.Unlock() - return nil + + return buffer, nil } func (c *cachedTable) UpdateLockForRead(ctx sessionctx.Context, ts uint64) error { - // Now only the data is re-load here, and the lock information is not updated. any read-lock information update will in the next pr. - err := c.loadDataFromOriginalTable(ctx) + // Load data from original table and the update lock information. + tid := c.Meta().ID + lease := leaseFromTS(ts) + succ, err := c.handle.LockForRead(tid, ts, lease) if err != nil { - return fmt.Errorf("reload data error") + return errors.Trace(err) } + if succ { + mb, err := c.loadDataFromOriginalTable(ctx, lease) + if err != nil { + return errors.Trace(err) + } + + c.cacheData.Store(&table.CacheData{ + Lease: lease, + MemBuffer: mb, + }) + } + // Current status is not suitable to cache. return nil } + +// AddRecord implements the AddRecord method for the table.Table interface. +func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txn, err := ctx.Txn(true) + if err != nil { + return nil, err + } + now := txn.StartTS() + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + if err != nil { + return nil, errors.Trace(err) + } + return c.TableCommon.AddRecord(ctx, r, opts...) + +} + +// UpdateRecord implements table.Table +func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { + txn, err := sctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) +} + +// RemoveRecord implements table.Table RemoveRecord interface. +func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txn, err := ctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + return c.TableCommon.RemoveRecord(ctx, h, r) +} diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 9141899599d24..be37328235d40 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -157,11 +157,47 @@ func TestCacheCondition(t *testing.T) { // Normal query should trigger cache. tk.MustQuery("select * from t2") - var i int - for ; i < 10; i++ { - if tk.HasPlan("select * from t2 where id>0", "UnionScan") { - return + for !tk.HasPlan("select * from t2 where id>0", "UnionScan") { + tk.MustExec("select * from t2") + } +} + +func TestCacheTableBasicReadAndWrite(t *testing.T) { + t.Parallel() + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk.MustExec("drop table if exists write_tmp1") + tk.MustExec("create table write_tmp1 (id int primary key auto_increment, u int unique, v int)") + tk.MustExec("insert into write_tmp1 values" + + "(1, 101, 1001), (3, 113, 1003)", + ) + + tk.MustExec("alter table write_tmp1 cache") + // Read and add read lock + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", + "3 113 1003")) + // read lock should valid + for i := 0; i < 10; i++ { + if tk.HasPlan("select *from write_tmp1", "UnionScan") { + break } } - require.True(t, i < 10) + tk.MustExec("use test") + tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") + // write lock exists + require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan")) + // wait write lock expire and check cache can be used again + for !tk.HasPlan("select *from write_tmp1", "UnionScan") { + tk.MustExec("select *from write_tmp1") + } + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) + tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") + for !tk.HasPlan("select *from write_tmp1", "UnionScan") { + tk.MustExec("select *from write_tmp1") + } + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } diff --git a/table/tables/main_test.go b/table/tables/main_test.go index be83c73b29f93..c7499ee0109aa 100644 --- a/table/tables/main_test.go +++ b/table/tables/main_test.go @@ -23,10 +23,10 @@ import ( func TestMain(m *testing.M) { testbridge.WorkaroundGoCheckFlags() - opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go new file mode 100644 index 0000000000000..5ef43271955b7 --- /dev/null +++ b/table/tables/state_remote.go @@ -0,0 +1,278 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "fmt" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/tikv/client-go/v2/oracle" +) + +// CachedTableLockType define the lock type for cached table +type CachedTableLockType int + +const ( + // CachedTableLockNone means there is no lock. + CachedTableLockNone CachedTableLockType = iota + // CachedTableLockRead is the READ lock type. + CachedTableLockRead + // CachedTableLockIntend is the write INTEND, it exists when the changing READ to WRITE, and the READ lock lease is not expired.. + CachedTableLockIntend + // CachedTableLockWrite is the WRITE lock type. + CachedTableLockWrite +) + +// StateRemote Indicates the remote status information of the read-write lock +type StateRemote interface { + // Load obtain the corresponding lock type and lease value according to the tableID + Load(tid int64) (CachedTableLockType, uint64, error) + + // LockForRead try to add a read lock to the table with the specified tableID + LockForRead(tid int64, now, ts uint64) (bool, error) + + // LockForWrite try to add a write lock to the table with the specified tableID + LockForWrite(tid int64, now, ts uint64) error + + // RenewLease attempt to renew the read lock on the table with the specified tableID + RenewLease(tid int64, ts uint64) (bool, error) +} + +// mockStateRemoteHandle implement the StateRemote interface. +type mockStateRemoteHandle struct { + ch chan remoteTask +} + +func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) { + op := &loadOP{tid: tid} + op.Add(1) + r.ch <- op + op.Wait() + return op.lockType, op.lease, op.err +} + +func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) { + op := &lockForReadOP{tid: tid, now: now, ts: ts} + op.Add(1) + r.ch <- op + op.Wait() + return op.succ, op.err +} + +func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { + op := &lockForWriteOP{tid: tid, now: now, ts: ts} + op.Add(1) + r.ch <- op + op.Wait() + if op.err != nil { + return errors.Trace(op.err) + } + // No block, finish. + if op.oldLease == 0 { + return nil + } + + // Wait for read lock to expire. + t1 := oracle.GetTimeFromTS(op.oldLease) + t2 := oracle.GetTimeFromTS(now) + waitDuration := t1.Sub(t2) + time.Sleep(waitDuration) + + // TODO: now should be a new ts + op = &lockForWriteOP{tid: tid, now: op.oldLease + 1, ts: leaseFromTS(op.oldLease + 1)} + op.Add(1) + r.ch <- op + op.Wait() + // op.oldLease should be 0 this time. + return op.err +} + +func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64) (bool, error) { + return false, errors.New("not implemented yet") +} + +func mockRemoteService(r *mockStateRemoteData, ch chan remoteTask) { + for task := range ch { + task.Exec(r) + } +} + +type remoteTask interface { + Exec(data *mockStateRemoteData) +} + +// loadOP is a kind of remoteTask +type loadOP struct { + sync.WaitGroup + // Input + tid int64 + + // Output + lockType CachedTableLockType + lease uint64 + err error +} + +func (op *loadOP) Exec(data *mockStateRemoteData) { + op.lockType, op.lease, op.err = data.Load(op.tid) + op.Done() +} + +// lockForReadOP is a kind of rmoteTask +type lockForReadOP struct { + sync.WaitGroup + // Input + tid int64 + now uint64 + ts uint64 + + // Output + succ bool + err error +} + +func (op *lockForReadOP) Exec(r *mockStateRemoteData) { + op.succ, op.err = r.LockForRead(op.tid, op.now, op.ts) + op.Done() +} + +// lockForWriteOP is a kind of remote task +type lockForWriteOP struct { + sync.WaitGroup + // Input + tid int64 + now uint64 + ts uint64 + + // Output + err error + oldLease uint64 +} + +func (op *lockForWriteOP) Exec(data *mockStateRemoteData) { + op.oldLease, op.err = data.LockForWrite(op.tid, op.now, op.ts) + op.Done() +} + +type mockStateRemoteData struct { + data map[int64]*stateRecord +} + +type stateRecord struct { + lockLease uint64 + oldReadLease uint64 // only use for intent lock, it means old read lease. + lockType CachedTableLockType +} + +func newMockStateRemoteData() *mockStateRemoteData { + return &mockStateRemoteData{ + data: make(map[int64]*stateRecord), + } +} + +func (r *mockStateRemoteData) Load(tid int64) (CachedTableLockType, uint64, error) { + record, ok := r.data[tid] + if !ok { + return CachedTableLockNone, 0, nil + } + return record.lockType, record.lockLease, nil +} + +func (r *mockStateRemoteData) LockForRead(tid int64, now, ts uint64) (bool, error) { + record, ok := r.data[tid] + if !ok { + record = &stateRecord{ + lockLease: ts, + oldReadLease: ts, + lockType: CachedTableLockRead, + } + r.data[tid] = record + return true, nil + } + switch record.lockType { + case CachedTableLockNone: + // Add the read lock + record.lockType = CachedTableLockRead + record.lockLease = ts + return true, nil + case CachedTableLockRead: + // Renew lease for this case. + if record.lockLease < ts { + record.lockLease = ts + return true, nil + } + // Already read locked. + return true, nil + case CachedTableLockWrite, CachedTableLockIntend: + if now > record.lockLease { + // Outdated...clear orphan lock + record.lockType = CachedTableLockRead + record.lockLease = ts + return true, nil + } + return false, nil + } + return false, errors.New("unknown lock type") +} + +func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, error) { + record, ok := r.data[tid] + if !ok { + record = &stateRecord{ + lockType: CachedTableLockWrite, + lockLease: ts, + } + r.data[tid] = record + return 0, nil + } + + switch record.lockType { + case CachedTableLockNone: + record.lockType = CachedTableLockWrite + record.lockLease = ts + return 0, nil + case CachedTableLockRead: + if now > record.lockLease { + // Outdated, clear orphan lock and add write lock directly. + record.lockType = CachedTableLockWrite + record.lockLease = ts + return 0, nil + } + + // Change state to intend, prevent renew lease operation. + oldLease := record.lockLease + record.lockType = CachedTableLockIntend + record.lockLease = leaseFromTS(ts) + record.oldReadLease = oldLease + return oldLease, nil + case CachedTableLockWrite: + if ts > record.lockLease { + record.lockLease = ts + } + case CachedTableLockIntend: + // Add the write lock. + if now > record.oldReadLease { + record.lockType = CachedTableLockWrite + record.lockLease = ts + } else { + return record.oldReadLease, nil + } + default: + return 0, fmt.Errorf("wrong lock state %v", record.lockType) + } + return 0, nil +}