From da6625bb5ab1f1a01604a08715cb9edc741a9f21 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 23 Nov 2021 15:45:28 +0800 Subject: [PATCH 01/13] apply patch --- ddl/ddl_api.go | 18 +- domain/domain.go | 16 +- executor/builder.go | 2 +- infoschema/builder.go | 14 +- planner/core/logical_plan_builder.go | 2 +- session/bootstrap.go | 21 +- table/table.go | 5 +- table/tables/cache.go | 43 ++-- table/tables/cache_test.go | 135 ++++++++----- table/tables/state_remote.go | 283 +++++++++++++++++++++++++-- table/tables/state_remote_test.go | 128 ++++++++++++ table/tables/tables.go | 4 +- 12 files changed, 566 insertions(+), 105 deletions(-) create mode 100644 table/tables/state_remote_test.go diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 76d9a786988cd..5e9f4549f0403 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/ddl/label" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -6624,7 +6625,22 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + + _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + + // exec := ctx.(sqlexec.RestrictedSQLExecutor) + // stmt, err := exec.ParseWithParams(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + // if err != nil { + // return errors.Trace(err) + // } + // _, _, err = exec.ExecRestrictedStmt(context.Background(), stmt) + if err != nil { + return errors.Trace(err) + } + return err } func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error) { diff --git a/domain/domain.go b/domain/domain.go index c5d8dc8246cd0..d5f27ab453e09 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -95,6 +95,7 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() + sysFactory func(*Domain) (pools.Resource, error) } // loadInfoSchema loads infoschema at startTS. @@ -159,7 +160,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -173,6 +174,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return is, false, currentSchemaVersion, nil, nil } +func (do *Domain) sysFacHack() (pools.Resource, error) { + // TODO: Here we create new sessions with sysFac in DDL, + // which will use `do` as Domain instead of call `domap.Get`. + // That's because `domap.Get` requires a lock, but before + // we initialize Domain finish, we can't require that again. + // After we remove the lazy logic of creating Domain, we + // can simplify code here. + return do.sysFactory(do) +} + func (do *Domain) fetchPolicies(m *meta.Meta) ([]*model.PolicyInfo, error) { allPolicies, err := m.ListPolicies() if err != nil { @@ -271,7 +282,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -712,6 +723,7 @@ const serverIDForStandalone = 1 // serverID for standalone deployment. // Init initializes a domain. func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error { + do.sysFactory = sysFactory perfschema.Init() if ebd, ok := do.store.(kv.EtcdBackend); ok { var addrs []string diff --git a/executor/builder.go b/executor/builder.go index c5ac06285e411..b7ee4dd953088 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4697,7 +4697,7 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 } }() if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS) + err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/infoschema/builder.go b/infoschema/builder.go index d0ab2d58ac273..58baa6d27eec2 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/ngaut/pools" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/domainutil" ) @@ -43,6 +45,7 @@ type Builder struct { store kv.Storage // TODO: renewLeaseCh is only used to pass data between table and domain renewLeaseCh chan func() + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -630,7 +633,13 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf return nil, errors.Trace(err) } if t, ok := ret.(table.CachedTable); ok { - err = t.Init(b.renewLeaseCh) + var tmp pools.Resource + tmp, err = b.factory() + if err != nil { + return nil, errors.Trace(err) + } + + err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor)) if err != nil { return nil, errors.Trace(err) } @@ -674,7 +683,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, renewCh chan func()) *Builder { +func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -684,6 +693,7 @@ func NewBuilder(store kv.Storage, renewCh chan func()) *Builder { sortedTablesBuckets: make([]sortedTables, bucketCount), }, renewLeaseCh: renewCh, + factory: factory, } } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 02451a4c4cb65..76cf1526034b3 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4209,7 +4209,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if r := recover(); r != nil { } }() - err := cachedTable.UpdateLockForRead(b.ctx.GetStore(), txn.StartTS()) + err := cachedTable.UpdateLockForRead(ctx, b.ctx.GetStore(), txn.StartTS()) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/session/bootstrap.go b/session/bootstrap.go index 573c17bdaaf2e..763832adadd31 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -357,6 +357,14 @@ const ( last_analyzed_at TIMESTAMP, PRIMARY KEY (table_id, column_id) CLUSTERED );` + // CreateTableCacheMetaTable stores the cached table meta lock information. + CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta ( + tid int(11) NOT NULL DEFAULT 0, + lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE', + lease bigint(20) NOT NULL DEFAULT 0, + oldReadLease bigint(20) NOT NULL DEFAULT 0, + PRIMARY KEY (tid) + );` ) // bootstrap initiates system DB for a store. @@ -528,11 +536,13 @@ const ( version77 = 77 // version78 updates mysql.stats_buckets.lower_bound, mysql.stats_buckets.upper_bound and mysql.stats_histograms.last_analyze_pos from BLOB to LONGBLOB. version78 = 78 + // version79 adds the mysql.table_cache_meta table + version79 = 79 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version78 +var currentBootstrapVersion int64 = version79 var ( bootstrapVersion = []func(Session, int64){ @@ -1612,6 +1622,13 @@ func upgradeToVer78(s Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms MODIFY last_analyze_pos LONGBLOB DEFAULT NULL") } +func upgradeToVer79(s Session, ver int64) { + if ver >= version79 { + return + } + doReentrantDDL(s, CreateTableCacheMetaTable) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1694,6 +1711,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateCapturePlanBaselinesBlacklist) // Create column_stats_usage table mustExecute(s, CreateColumnStatsUsageTable) + // Create table_cache_meta table. + mustExecute(s, CreateTableCacheMetaTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/table/table.go b/table/table.go index c96c77b587d19..b78b7b6abb0f4 100644 --- a/table/table.go +++ b/table/table.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" @@ -252,12 +253,12 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - Init(renewCh chan func()) error + Init(renewCh chan func(), exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table - UpdateLockForRead(store kv.Storage, ts uint64) error + UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error } diff --git a/table/tables/cache.go b/table/tables/cache.go index c7e4cea2e8814..33cd5d0429350 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -15,6 +15,7 @@ package tables import ( + "fmt" "context" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" ) @@ -41,7 +43,6 @@ const ( ) var ( - _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) @@ -80,6 +81,7 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { tmp := c.cacheData.Load() if tmp == nil { + fmt.Println("TryReadFromCache return nil because ... data not loaded") return nil } data := tmp.(*cacheData) @@ -93,36 +95,27 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { } return data } + fmt.Println("TryReadFromCache return nil because ... ts not correct...", ts, data.Start, data.Lease) return nil } -// MockStateRemote represents the information of stateRemote. -// Exported it only for testing. -var MockStateRemote = struct { - Ch chan remoteTask - Data *mockStateRemoteData -}{} - -// NewCachedTable creates a new CachedTable Instance -func NewCachedTable(tbl *TableCommon) (table.Table, error) { - if MockStateRemote.Data == nil { - MockStateRemote.Data = newMockStateRemoteData() - MockStateRemote.Ch = make(chan remoteTask, 100) - go mockRemoteService(MockStateRemote.Data, MockStateRemote.Ch) - } - +// newCachedTable creates a new CachedTable Instance +func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, - handle: &mockStateRemoteHandle{MockStateRemote.Ch}, - renewCh: make(chan func()), } return ret, nil } // Init is an extra operation for cachedTable after TableFromMeta, // Because cachedTable need some additional parameter that can't be passed in TableFromMeta. -func (c *cachedTable) Init(renewCh chan func()) error { +func (c *cachedTable) Init(renewCh chan func(), exec sqlexec.SQLExecutor) error { c.renewCh = renewCh + raw, ok := exec.(sqlExec) + if !ok { + return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") + } + c.handle = NewStateRemote(raw) return nil } @@ -167,11 +160,11 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return buffer, startTS, nil } -func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { +func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error { // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(tid, ts, lease) + succ, err := c.handle.LockForRead(ctx, tid, ts, lease) if err != nil { return errors.Trace(err) } @@ -198,7 +191,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. return nil, err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -212,7 +205,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -226,7 +219,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -237,7 +230,7 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) return func() { tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(tid, ts, lease, op) + succ, err := c.handle.RenewLease(context.Background(), tid, ts, lease, op) if err != nil { log.Warn("Renew read lease error") } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index a435f10422e85..e203ccae25cbd 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,12 +15,13 @@ package tables_test import ( + "fmt" "testing" "time" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table/tables" + // "github.com/pingcap/tidb/infoschema" + // "github.com/pingcap/tidb/parser/model" + // "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -172,18 +173,21 @@ func TestCacheCondition(t *testing.T) { // Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id > 0") + fmt.Println("======= update 1") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id = 2") + fmt.Println("======= update 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Delete should not trigger cache. + fmt.Println("======= delete ") tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) @@ -212,28 +216,28 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustExec("alter table write_tmp1 cache") // Read and add read lock - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid for i := 0; i < 10; i++ { - if tk.HasPlan("select *from write_tmp1", "UnionScan") { + if tk.HasPlan("select * from write_tmp1", "UnionScan") { break } } tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists - require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan")) + require.False(t, tk.HasPlan("select * from write_tmp1", "UnionScan")) // wait write lock expire and check cache can be used again - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } func TestCacheTableComplexRead(t *testing.T) { @@ -247,25 +251,40 @@ func TestCacheTableComplexRead(t *testing.T) { tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)") tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)") tk1.MustExec("alter table complex_cache cache") + tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + var i int + for i = 0; i < 100; i++ { + time.Sleep(100 * time.Millisecond) + if tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break + } + } + require.True(t, i < 10) + tk1.MustExec("begin") - tk1.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - - go func() { - tk2.MustExec("begin") - tk2.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - var i int - for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) - if tk2.HasPlan("select *from complex_cache where id > 7", "UnionScan") { - break - } + // go func() { + // defer func() { + // if r := recover(); r != nil { + // fmt.Println("xxxx", r) + // } + // }() + tk2.MustExec("begin") + tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + fmt.Println("run here 1111111111111111111111") + for i = 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break } - require.True(t, i < 10) - tk2.MustExec("commit") - doneCh <- struct{}{} - }() - <-doneCh - tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") + fmt.Println("run here 2222222222222222222222222") + } + require.True(t, i < 10) + tk2.MustExec("commit") + doneCh <- struct{}{} + // }() + // <-doneCh + + tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") } @@ -285,10 +304,18 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustExec("insert into aba values (1, 1)") tk1.MustExec("alter table aba cache") tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + cacheUsed := false + for i := 0; i < 100; i++ { + if tk1.HasPlan("select * from aba", "UnionScan") { + cacheUsed = true + break + } + } + require.True(t, cacheUsed) // Begin, read from cache. tk1.MustExec("begin") - cacheUsed := false + cacheUsed = false for i := 0; i < 100; i++ { if tk1.HasPlan("select * from aba", "UnionScan") { cacheUsed = true @@ -393,27 +420,27 @@ func TestCacheTableBatchPointGet(t *testing.T) { tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101")) } -func TestRenewLease(t *testing.T) { - // Test RenewLeaseForRead - store, clean := testkit.CreateMockStore(t) - defer clean() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - se := tk.Session() - tk.MustExec("create table cache_renew_t (id int)") - tk.MustExec("alter table cache_renew_t cache") - tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) - require.NoError(t, err) - var i int - tk.MustExec("select * from cache_renew_t") - _, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) - for i = 0; i < 20; i++ { - time.Sleep(200 * time.Millisecond) - tk.MustExec("select * from cache_renew_t") - _, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) - if lease != oldLease { - break - } - } - require.True(t, i < 20) -} +// func TestRenewLease(t *testing.T) { +// // Test RenewLeaseForRead +// store, clean := testkit.CreateMockStore(t) +// defer clean() +// tk := testkit.NewTestKit(t, store) +// tk.MustExec("use test") +// se := tk.Session() +// tk.MustExec("create table cache_renew_t (id int)") +// tk.MustExec("alter table cache_renew_t cache") +// tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) +// require.NoError(t, err) +// var i int +// tk.MustExec("select * from cache_renew_t") +// _, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) +// for i = 0; i < 20; i++ { +// time.Sleep(200 * time.Millisecond) +// tk.MustExec("select * from cache_renew_t") +// _, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) +// if lease != oldLease { +// break +// } +// } +// require.True(t, i < 20) +// } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 46a304127ec5e..a2b3c710e36dd 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -15,11 +15,15 @@ package tables import ( + "context" "fmt" "sync" "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" ) @@ -37,19 +41,33 @@ const ( CachedTableLockWrite ) -// StateRemote Indicates the remote status information of the read-write lock +func (l CachedTableLockType) String() string { + switch l { + case CachedTableLockNone: + return "NONE" + case CachedTableLockRead: + return "READ" + case CachedTableLockIntend: + return "INTEND" + case CachedTableLockWrite: + return "WRITE" + } + panic("invalid CachedTableLockType value") +} + +// StateRemote is the interface to control the remote state of the cached table's lock meta information. type StateRemote interface { // Load obtain the corresponding lock type and lease value according to the tableID - Load(tid int64) (CachedTableLockType, uint64, error) + Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) // LockForRead try to add a read lock to the table with the specified tableID - LockForRead(tid int64, now, ts uint64) (bool, error) + LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(tid int64, now, ts uint64) error + LockForWrite(ctx context.Context, tid int64, now, ts uint64) error // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) + RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -57,7 +75,9 @@ type mockStateRemoteHandle struct { ch chan remoteTask } -func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) { +var _ StateRemote = &mockStateRemoteHandle{} + +func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { op := &loadOP{tid: tid} op.Add(1) r.ch <- op @@ -65,7 +85,7 @@ func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, er return op.lockType, op.lease, op.err } -func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) { +func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) { op := &lockForReadOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -73,7 +93,7 @@ func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, er return op.succ, op.err } -func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { +func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { op := &lockForWriteOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -101,7 +121,7 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { return op.err } -func (r *mockStateRemoteHandle) RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { switch op { case RenewReadLease: op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} @@ -206,11 +226,11 @@ type stateRecord struct { lockType CachedTableLockType } -func newMockStateRemoteData() *mockStateRemoteData { - return &mockStateRemoteData{ - data: make(map[int64]*stateRecord), - } -} +// func newMockStateRemoteData() *mockStateRemoteData { +// return &mockStateRemoteData{ +// data: make(map[int64]*stateRecord), +// } +// } func (r *mockStateRemoteData) Load(tid int64) (CachedTableLockType, uint64, error) { record, ok := r.data[tid] @@ -326,5 +346,240 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u return true, nil } return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") +} + +type sqlExec interface { + AffectedRows() uint64 + ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) + GetStore() kv.Storage +} + +type stateRemoteHandle struct { + exec sqlExec + sync.Mutex +} + +// NewStateRemote creates a StateRemote object. +func NewStateRemote(exec sqlExec) *stateRemoteHandle { + return &stateRemoteHandle{ + exec: exec, + } +} + +var _ StateRemote = &stateRemoteHandle{} + +func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { + lockType, lease, _, err := h.loadRow(ctx, tid) + return lockType, lease, err +} + +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { + h.Lock() + defer h.Unlock() + succ := false + err := h.runInTxn(ctx, func(ctx context.Context) error { + lockType, lease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + // The old lock is outdated, clear orphan lock. + if now > lease { + succ = true + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } + return nil + } + + switch lockType { + case CachedTableLockNone: + case CachedTableLockRead: + case CachedTableLockWrite, CachedTableLockIntend: + return nil + } + succ = true + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } + + return nil + }) + return succ, err +} + +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { + h.Lock() + defer h.Unlock() + for { + retry, err := h.lockForWriteOnce(ctx, tid, now, ts) + if err != nil { + return err + } + if !retry { + break + } + + store := h.exec.GetStore() + o := store.GetOracle() + newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + if err != nil { + return errors.Trace(err) + } + now, ts = newTS, leaseFromTS(newTS) + } + return nil +} + +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) ( /*retry*/ bool, error) { + err := h.beginTxn(ctx) + if err != nil { + return false, errors.Trace(err) + } + defer func() { + if err != nil { + fmt.Println("defer lockForWriteOnce, err ==", err) + h.rollbackTxn(ctx) + } + }() + + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) + if err != nil { + return false, errors.Trace(err) + } + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. + if now > lease { + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return false, errors.Trace(err) + } + return false, h.commitTxn(ctx) + } + + // The lease is valid. + switch lockType { + case CachedTableLockNone: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return false, errors.Trace(err) + } + return false, h.commitTxn(ctx) + case CachedTableLockRead: + // Change from READ to INTEND + if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { + return false, errors.Trace(err) + } + if err = h.commitTxn(ctx); err != nil { + return false, errors.Trace(err) + } + + // Wait for lease to expire, and then retry. + waitForLeaseExpire(oldReadLease, now) + return true, nil + case CachedTableLockIntend, CachedTableLockWrite: + // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. + if now > oldReadLease { + if lockType == CachedTableLockIntend { + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return false, errors.Trace(err) + } + } + return false, h.commitTxn(ctx) + } + + // Otherwise, the WRITE should wait for the READ lease expire. + fmt.Println("rollback txn wait for read lease...") + h.rollbackTxn(ctx) + waitForLeaseExpire(oldReadLease, now) + // And then retry change the lock to WRITE + return true, nil + } + return false, errors.New("should never run here") +} + +func waitForLeaseExpire(oldReadLease, now uint64) { + if oldReadLease >= now { + t1 := oracle.GetTimeFromTS(oldReadLease) + t2 := oracle.GetTimeFromTS(now) + waitDuration := t1.Sub(t2) + fmt.Println("wait for lease expirte ===", waitDuration) + time.Sleep(waitDuration) + } +} + +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, now, ts uint64, _ RenewLeaseType) (bool, error) { + h.Lock() + defer h.Unlock() + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid) + if err != nil { + return false, errors.Trace(err) + } + succ := h.exec.AffectedRows() > 0 + return succ, err +} +func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "begin") + fmt.Printf("EEEEEEEEEEEEEEEE begin ...%p...\n", h.exec) + return err +} + +func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "commit") + fmt.Printf("EEEEEEEEEEEEEEEE commit %v...%p ...\n", err, h.exec) + return err +} + +func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "rollback") + fmt.Println("EEEEEEEEEEEEEEEE rollback") + return err +} + +func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error { + err := h.beginTxn(ctx) + if err != nil { + return errors.Trace(err) + } + + err = fn(ctx) + if err != nil { + h.rollbackTxn(ctx) + fmt.Println("==== rollback exec error ===", err) + return errors.Trace(err) + } + + return h.commitTxn(ctx) +} + +func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { + chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) + if err != nil { + return 0, 0, 0, errors.Trace(err) + } + if len(chunkRows) != 1 { + return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) + } + col1 := chunkRows[0].GetEnum(0) + // Note, the MySQL enum value start from 1 rather than 0 + lockType := CachedTableLockType(col1.Value - 1) + lease := chunkRows[0].GetUint64(1) + oldReadLease := chunkRows[0].GetUint64(2) + return lockType, lease, oldReadLease, nil +} + +func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error { + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid) + return err +} + +func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { + rs, err := h.exec.ExecuteInternal(ctx, sql, args...) + if rs != nil { + defer rs.Close() + } + if err != nil { + return nil, errors.Trace(err) + } + if rs != nil { + return sqlexec.DrainRecordSet(ctx, rs, 1) + } + return nil, nil } diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go new file mode 100644 index 0000000000000..969f0fb56b0e0 --- /dev/null +++ b/table/tables/state_remote_test.go @@ -0,0 +1,128 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func createMetaLockForCachedTable(h session.Session) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. +func initRow(ctx context.Context, exec session.Session, tid int) error { + _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) + return err +} + +func TestStateRemote(t *testing.T) { + t.Parallel() + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + ctx := context.Background() + h := tables.NewStateRemote(se) + err := createMetaLockForCachedTable(se) + require.NoError(t, err) + require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) + + // Check the initial value. + require.NoError(t, initRow(ctx, se, 5)) + lockType, lease, err := h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockNone) + require.Equal(t, lockType.String(), "NONE") + require.Equal(t, lease, uint64(0)) + + // Check read lock. + succ, err := h.LockForRead(ctx, 5, 1234, 1234) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1234)) + + // LockForRead when read lock is hold. + // This operation equals to renew lease. + succ, err = h.LockForRead(ctx, 5, 1235, 1235) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1235)) + + // Renew read lock lease operation. + succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1264)) + + // Check write lock. + require.NoError(t, h.LockForWrite(ctx, 5, 2234, 2234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(2234)) + + // Lock for write again + require.NoError(t, h.LockForWrite(ctx, 5, 3234, 3234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(3234)) + + // Renew read lock lease should fail when the write lock is hold. + succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + require.NoError(t, err) + require.False(t, succ) + + // Acquire read lock should also fail when the write lock is hold. + succ, err = h.LockForRead(ctx, 5, 1264, 1264) + require.NoError(t, err) + require.False(t, succ) + + // But clear orphan write lock should success. + succ, err = h.LockForRead(ctx, 5, 4234, 4234) + require.NoError(t, err) + require.True(t, succ) +} diff --git a/table/tables/tables.go b/table/tables/tables.go index 5adf626ca2f78..716b2e879cbcc 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -85,7 +85,7 @@ func MockTableFromMeta(tblInfo *model.TableInfo) table.Table { var t TableCommon initTableCommon(&t, tblInfo, tblInfo.ID, columns, nil) if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - ret, err := NewCachedTable(&t) + ret, err := newCachedTable(&t) if err != nil { return nil } @@ -153,7 +153,7 @@ func TableFromMeta(allocs autoid.Allocators, tblInfo *model.TableInfo) (table.Ta return nil, err } if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - return NewCachedTable(&t) + return newCachedTable(&t) } return &t, nil } From 455b2f0604b47e5dd18035c92c08f32247938e99 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 23 Nov 2021 20:05:48 +0800 Subject: [PATCH 02/13] update StateRemote interface and tests --- ddl/placement_policy_ddl_test.go | 2 +- executor/infoschema_reader_test.go | 2 +- executor/main_test.go | 1 - executor/slow_query_test.go | 2 +- expression/main_test.go | 1 - infoschema/infoschema_test.go | 6 +- table/tables/cache.go | 19 +-- table/tables/cache_test.go | 76 +++++++---- table/tables/main_test.go | 1 - table/tables/state_remote.go | 207 ++++++++++++++++------------- table/tables/state_remote_test.go | 41 ++++-- telemetry/main_test.go | 1 - 12 files changed, 206 insertions(+), 153 deletions(-) diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index 22f07656d350b..6053a2daa652b 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -113,7 +113,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, nil, []*model.PolicyInfo{p1, p2, p3, p4, p5}, diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 05f85e8275b06..c92ce7da52495 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -918,7 +918,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows( "test 2", )) - c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 26) + c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 27) // More tests about the privileges. tk.MustExec("create user 'testuser'@'localhost'") diff --git a/executor/main_test.go b/executor/main_test.go index a0ecb2044be78..1087ff5bdd5f4 100644 --- a/executor/main_test.go +++ b/executor/main_test.go @@ -55,7 +55,6 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } callback := func(i int) int { testDataMap.GenerateOutputIfNeeded() diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 66ca71897602f..ec535f6f82781 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/expression/main_test.go b/expression/main_test.go index 84a4ad15392ad..1c68ba643159b 100644 --- a/expression/main_test.go +++ b/expression/main_test.go @@ -48,7 +48,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 87c1695833f4d..c66fc5e69241c 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() diff --git a/table/tables/cache.go b/table/tables/cache.go index 33cd5d0429350..34e699faefbb3 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -19,6 +19,7 @@ import ( "context" "sync/atomic" "time" + "go.uber.org/zap" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -90,10 +91,10 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { nowTime := oracle.GetTimeFromTS(ts) distance := leaseTime.Sub(nowTime) // TODO make this configurable in the following PRs - if distance >= 0 && distance <= (1*time.Second) { + if distance >= 0 && distance <= (1500*time.Millisecond) { c.renewCh <- c.renewLease(ts, RenewReadLease, data) } - return data + return data.MemBuffer } fmt.Println("TryReadFromCache return nil because ... ts not correct...", ts, data.Start, data.Lease) return nil @@ -164,7 +165,7 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(ctx, tid, ts, lease) + succ, err := c.handle.LockForRead(ctx, tid, lease) if err != nil { return errors.Trace(err) } @@ -191,7 +192,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. return nil, err } now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -205,7 +206,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } now := txn.StartTS() - err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -219,7 +220,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -230,15 +231,15 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) return func() { tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(context.Background(), tid, ts, lease, op) + succ, err := c.handle.RenewLease(context.Background(), tid, lease, op) if err != nil { - log.Warn("Renew read lease error") + log.Warn("Renew read lease error", zap.Error(err)) } if succ { c.cacheData.Store(&cacheData{ Start: data.Start, Lease: lease, - MemBuffer: data, + MemBuffer: data.MemBuffer, }) } } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e203ccae25cbd..f78bbdd535e3d 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -18,10 +18,11 @@ import ( "fmt" "testing" "time" + "context" - // "github.com/pingcap/tidb/infoschema" - // "github.com/pingcap/tidb/parser/model" - // "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -219,11 +220,14 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid - for i := 0; i < 10; i++ { + var i int + for i = 0; i < 10; i++ { if tk.HasPlan("select * from write_tmp1", "UnionScan") { break } } + require.True(t, i<10) + tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists @@ -420,27 +424,43 @@ func TestCacheTableBatchPointGet(t *testing.T) { tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101")) } -// func TestRenewLease(t *testing.T) { -// // Test RenewLeaseForRead -// store, clean := testkit.CreateMockStore(t) -// defer clean() -// tk := testkit.NewTestKit(t, store) -// tk.MustExec("use test") -// se := tk.Session() -// tk.MustExec("create table cache_renew_t (id int)") -// tk.MustExec("alter table cache_renew_t cache") -// tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) -// require.NoError(t, err) -// var i int -// tk.MustExec("select * from cache_renew_t") -// _, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) -// for i = 0; i < 20; i++ { -// time.Sleep(200 * time.Millisecond) -// tk.MustExec("select * from cache_renew_t") -// _, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) -// if lease != oldLease { -// break -// } -// } -// require.True(t, i < 20) -// } +func TestRenewLease(t *testing.T) { + // Test RenewLeaseForRead + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + tk.MustExec("create table cache_renew_t (id int)") + tk.MustExec("alter table cache_renew_t cache") + tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) + require.NoError(t, err) + var i int + tk.MustExec("select * from cache_renew_t") + + tk1 := testkit.NewTestKit(t, store) + remote := tables.NewStateRemote(tk1.Session()) + var leaseBefore uint64 + for i =0; i<20; i++ { + time.Sleep(200 * time.Millisecond) + lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID) + require.NoError(t, err) + if lockType == tables.CachedTableLockRead { + leaseBefore = lease + break + } + } + require.True(t, i < 20) + + for i = 0; i < 20; i++ { + time.Sleep(200 * time.Millisecond) + tk.MustExec("select * from cache_renew_t") + lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + if leaseBefore != lease { + break + } + } + require.True(t, i < 20) +} diff --git a/table/tables/main_test.go b/table/tables/main_test.go index c7499ee0109aa..ebfceb2bd3bca 100644 --- a/table/tables/main_test.go +++ b/table/tables/main_test.go @@ -26,7 +26,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index a2b3c710e36dd..f6c9fa027247a 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -15,13 +15,14 @@ package tables import ( + "strconv" "context" "fmt" "sync" "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" + // "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" @@ -61,13 +62,13 @@ type StateRemote interface { Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) // LockForRead try to add a read lock to the table with the specified tableID - LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) + LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64, now, ts uint64) error + LockForWrite(ctx context.Context, tid int64, lease uint64) error // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) + RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -75,8 +76,6 @@ type mockStateRemoteHandle struct { ch chan remoteTask } -var _ StateRemote = &mockStateRemoteHandle{} - func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { op := &loadOP{tid: tid} op.Add(1) @@ -121,10 +120,10 @@ func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now return op.err } -func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) { switch op { case RenewReadLease: - op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} + op := &renewLeaseForReadOP{tid: tid, newTs: newTs} op.Add(1) r.ch <- op op.Wait() @@ -203,7 +202,6 @@ type renewLeaseForReadOP struct { sync.WaitGroup // Input tid int64 - oldTs uint64 newTs uint64 // Output @@ -212,7 +210,7 @@ type renewLeaseForReadOP struct { } func (op *renewLeaseForReadOP) Exec(r *mockStateRemoteData) { - op.succ, op.err = r.renewLeaseForRead(op.tid, op.oldTs, op.newTs) + op.succ, op.err = r.renewLeaseForRead(op.tid, op.newTs) op.Done() } @@ -325,7 +323,7 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e return 0, nil } -func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs uint64) (bool, error) { +func (r *mockStateRemoteData) renewLeaseForRead(tid int64, newTs uint64) (bool, error) { record, ok := r.data[tid] if !ok { record = &stateRecord{ @@ -338,9 +336,6 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u if record.lockType != CachedTableLockRead { return false, errors.New("The read lock can be renewed only in the read lock state") } - if record.lockLease < oldTs { - return false, errors.New("The remote Lease is invalid") - } if record.lockLease <= newTs { record.lockLease = newTs return true, nil @@ -349,9 +344,8 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u } type sqlExec interface { - AffectedRows() uint64 ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) - GetStore() kv.Storage + // GetStore() kv.Storage } type stateRemoteHandle struct { @@ -373,11 +367,11 @@ func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLoc return lockType, lease, err } -func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint64) ( /*succ*/ bool, error) { h.Lock() defer h.Unlock() succ := false - err := h.runInTxn(ctx, func(ctx context.Context) error { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) @@ -385,6 +379,7 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts // The old lock is outdated, clear orphan lock. if now > lease { succ = true + fmt.Println("lock for read .... now=", now, "lease=", lease, ts) if err := h.updateRow(ctx, tid, "READ", ts); err != nil { return errors.Trace(err) } @@ -407,112 +402,130 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts return succ, err } -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { h.Lock() defer h.Unlock() for { - retry, err := h.lockForWriteOnce(ctx, tid, now, ts) + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) if err != nil { return err } - if !retry { + if waitAndRetry == 0 { break } - store := h.exec.GetStore() - o := store.GetOracle() - newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) - if err != nil { - return errors.Trace(err) - } - now, ts = newTS, leaseFromTS(newTS) + fmt.Println("lock for write sleep and retry ===", waitAndRetry) + time.Sleep(waitAndRetry) + // store := h.exec.GetStore() + // o := store.GetOracle() + // newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + // if err != nil { + // return errors.Trace(err) + // } + // now, ts = newTS, leaseFromTS(newTS) } return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) ( /*retry*/ bool, error) { - err := h.beginTxn(ctx) - if err != nil { - return false, errors.Trace(err) - } - defer func() { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { - fmt.Println("defer lockForWriteOnce, err ==", err) - h.rollbackTxn(ctx) + return errors.Trace(err) } - }() - - lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) - if err != nil { - return false, errors.Trace(err) - } - // The lease is outdated, so lock is invalid, clear orphan lock of any kind. - if now > lease { - if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. + if now > lease { + fmt.Println("old lock outdated ... now = ", now, " lease=", lease) + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + return nil } - return false, h.commitTxn(ctx) - } - // The lease is valid. - switch lockType { - case CachedTableLockNone: - if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) - } - return false, h.commitTxn(ctx) - case CachedTableLockRead: - // Change from READ to INTEND - if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { - return false, errors.Trace(err) - } - if err = h.commitTxn(ctx); err != nil { - return false, errors.Trace(err) - } + // The lease is valid. + switch lockType { + case CachedTableLockNone: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + case CachedTableLockRead: + // Change from READ to INTEND + if _, err = h.execSQL(ctx, + "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", + lease, + ts, + tid); err != nil { + return errors.Trace(err) + } + if err = h.commitTxn(ctx); err != nil { + return errors.Trace(err) + } - // Wait for lease to expire, and then retry. - waitForLeaseExpire(oldReadLease, now) - return true, nil - case CachedTableLockIntend, CachedTableLockWrite: - // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. - if now > oldReadLease { - if lockType == CachedTableLockIntend { - if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) + // Wait for lease to expire, and then retry. + waitAndRetry = waitForLeaseExpire(lease, now) + fmt.Println("lease ===", lease, "now ===", now, "wait ===", waitAndRetry) + case CachedTableLockIntend: + // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. + if now > oldReadLease { + if lockType == CachedTableLockIntend { + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } } + return nil } - return false, h.commitTxn(ctx) + // Otherwise, the WRITE should wait for the READ lease expire. + // And then retry changing the lock to WRITE + waitAndRetry = waitForLeaseExpire(oldReadLease, now) } + return nil + }) - // Otherwise, the WRITE should wait for the READ lease expire. - fmt.Println("rollback txn wait for read lease...") - h.rollbackTxn(ctx) - waitForLeaseExpire(oldReadLease, now) - // And then retry change the lock to WRITE - return true, nil - } - return false, errors.New("should never run here") + return } -func waitForLeaseExpire(oldReadLease, now uint64) { +func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { if oldReadLease >= now { t1 := oracle.GetTimeFromTS(oldReadLease) t2 := oracle.GetTimeFromTS(now) waitDuration := t1.Sub(t2) - fmt.Println("wait for lease expirte ===", waitDuration) - time.Sleep(waitDuration) + return waitDuration } + return 0 } -func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, now, ts uint64, _ RenewLeaseType) (bool, error) { +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease uint64, op RenewLeaseType) (bool, error) { h.Lock() defer h.Unlock() - _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid) - if err != nil { - return false, errors.Trace(err) + + var succ bool + if op == RenewReadLease { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if now >= oldLease { + fmt.Println("now=", now, "oldLease=", oldLease, "newLease=", newLease) + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } - succ := h.exec.AffectedRows() > 0 - return succ, err + + // TODO: renew for write lease + return false, errors.New("not implement yet") } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { @@ -533,13 +546,23 @@ func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { return err } -func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error { +func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { err := h.beginTxn(ctx) if err != nil { return errors.Trace(err) } - err = fn(ctx) + rows, err := h.execSQL(ctx, "select @@tidb_current_ts") + if err != nil { + return errors.Trace(err) + } + resultStr := rows[0].GetString(0) + txnTS, err := strconv.ParseUint(resultStr, 10, 64) + if err != nil { + return errors.Trace(err) + } + + err = fn(ctx, txnTS) if err != nil { h.rollbackTxn(ctx) fmt.Println("==== rollback exec error ===", err) diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index 969f0fb56b0e0..c27b952573452 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -15,11 +15,14 @@ package tables_test import ( + "time" "context" "testing" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/table/tables" + "github.com/tikv/client-go/v2/oracle" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -64,65 +67,75 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lockType.String(), "NONE") require.Equal(t, lease, uint64(0)) + ts, err := se.GetStore().GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + require.NoError(t, err) + physicalTime := oracle.GetTimeFromTS(ts) + leaseVal := oracle.GoTimeToTS(physicalTime.Add(200*time.Millisecond)) + // Check read lock. - succ, err := h.LockForRead(ctx, 5, 1234, 1234) + succ, err := h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1234)) + require.Equal(t, lease, leaseVal) // LockForRead when read lock is hold. // This operation equals to renew lease. - succ, err = h.LockForRead(ctx, 5, 1235, 1235) + succ, err = h.LockForRead(ctx, 5, leaseVal + 1) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1235)) + require.Equal(t, lease, leaseVal + 1) // Renew read lock lease operation. - succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(400*time.Millisecond)) + succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, uint64(1264)) + require.Equal(t, lease, leaseVal) // Check write lock. - require.NoError(t, h.LockForWrite(ctx, 5, 2234, 2234)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(700*time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, uint64(2234)) + require.Equal(t, lease, leaseVal) - // Lock for write again - require.NoError(t, h.LockForWrite(ctx, 5, 3234, 3234)) + // // Lock for write again + leaseVal = oracle.GoTimeToTS(physicalTime.Add(800*time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, uint64(3234)) + // require.Equal(t, lease, leaseVal) // Renew read lock lease should fail when the write lock is hold. - succ, err = h.RenewLease(ctx, 5, 0, 1264, tables.RenewReadLease) + succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) require.NoError(t, err) require.False(t, succ) // Acquire read lock should also fail when the write lock is hold. - succ, err = h.LockForRead(ctx, 5, 1264, 1264) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.False(t, succ) // But clear orphan write lock should success. - succ, err = h.LockForRead(ctx, 5, 4234, 4234) + time.Sleep(time.Second) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(2*time.Second)) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) } diff --git a/telemetry/main_test.go b/telemetry/main_test.go index 3ab35518fd644..f498d16a2d564 100644 --- a/telemetry/main_test.go +++ b/telemetry/main_test.go @@ -32,7 +32,6 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/table/tables.mockRemoteService"), } goleak.VerifyTestMain(m, opts...) From aaf7d5bd4ade0670b581eb5d6b22ee6dc3bfff18 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 23 Nov 2021 20:47:15 +0800 Subject: [PATCH 03/13] fmt & cleanup --- ddl/ddl_api.go | 12 +-------- domain/domain.go | 2 +- infoschema/builder.go | 8 +++--- session/bootstrap.go | 1 + table/table.go | 2 +- table/tables/cache.go | 5 +--- table/tables/cache_test.go | 22 +++------------- table/tables/state_remote.go | 42 +++++++++++-------------------- table/tables/state_remote_test.go | 23 ++++++++--------- 9 files changed, 38 insertions(+), 79 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 5e9f4549f0403..1d2607f8dcb34 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/ddl/label" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -58,6 +57,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/set" + "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -6630,16 +6630,6 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) } _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) - - // exec := ctx.(sqlexec.RestrictedSQLExecutor) - // stmt, err := exec.ParseWithParams(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) - // if err != nil { - // return errors.Trace(err) - // } - // _, _, err = exec.ExecRestrictedStmt(context.Background(), stmt) - if err != nil { - return errors.Trace(err) - } return err } diff --git a/domain/domain.go b/domain/domain.go index d5f27ab453e09..0a037716696ae 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -95,7 +95,7 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() - sysFactory func(*Domain) (pools.Resource, error) + sysFactory func(*Domain) (pools.Resource, error) } // loadInfoSchema loads infoschema at startTS. diff --git a/infoschema/builder.go b/infoschema/builder.go index 58baa6d27eec2..70ca84f5dd6c6 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -20,9 +20,9 @@ import ( "sort" "strings" + "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/ngaut/pools" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" @@ -33,8 +33,8 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/domainutil" + "github.com/pingcap/tidb/util/sqlexec" ) // Builder builds a new InfoSchema. @@ -45,7 +45,7 @@ type Builder struct { store kv.Storage // TODO: renewLeaseCh is only used to pass data between table and domain renewLeaseCh chan func() - factory func() (pools.Resource, error) + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -693,7 +693,7 @@ func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Res sortedTablesBuckets: make([]sortedTables, bucketCount), }, renewLeaseCh: renewCh, - factory: factory, + factory: factory, } } diff --git a/session/bootstrap.go b/session/bootstrap.go index 763832adadd31..f14db0f9a700e 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -624,6 +624,7 @@ var ( upgradeToVer76, upgradeToVer77, upgradeToVer78, + upgradeToVer79, } ) diff --git a/table/table.go b/table/table.go index b78b7b6abb0f4..1c38ed4335ca6 100644 --- a/table/table.go +++ b/table/table.go @@ -24,12 +24,12 @@ import ( "github.com/opentracing/opentracing-go" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/sqlexec" ) // Type is used to distinguish between different tables that store data in different ways. diff --git a/table/tables/cache.go b/table/tables/cache.go index 34e699faefbb3..975eaad78ac56 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -15,11 +15,10 @@ package tables import ( - "fmt" "context" + "go.uber.org/zap" "sync/atomic" "time" - "go.uber.org/zap" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -82,7 +81,6 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { tmp := c.cacheData.Load() if tmp == nil { - fmt.Println("TryReadFromCache return nil because ... data not loaded") return nil } data := tmp.(*cacheData) @@ -96,7 +94,6 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { } return data.MemBuffer } - fmt.Println("TryReadFromCache return nil because ... ts not correct...", ts, data.Start, data.Lease) return nil } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index f78bbdd535e3d..a037d95395697 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,10 +15,9 @@ package tables_test import ( - "fmt" + "context" "testing" "time" - "context" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" @@ -174,21 +173,18 @@ func TestCacheCondition(t *testing.T) { // Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id > 0") - fmt.Println("======= update 1") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id = 2") - fmt.Println("======= update 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Delete should not trigger cache. - fmt.Println("======= delete ") tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) @@ -226,7 +222,7 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { break } } - require.True(t, i<10) + require.True(t, i < 10) tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") @@ -247,7 +243,6 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { func TestCacheTableComplexRead(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() - doneCh := make(chan struct{}, 1) tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") @@ -266,27 +261,16 @@ func TestCacheTableComplexRead(t *testing.T) { require.True(t, i < 10) tk1.MustExec("begin") - // go func() { - // defer func() { - // if r := recover(); r != nil { - // fmt.Println("xxxx", r) - // } - // }() tk2.MustExec("begin") tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - fmt.Println("run here 1111111111111111111111") for i = 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { break } - fmt.Println("run here 2222222222222222222222222") } require.True(t, i < 10) tk2.MustExec("commit") - doneCh <- struct{}{} - // }() - // <-doneCh tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") @@ -441,7 +425,7 @@ func TestRenewLease(t *testing.T) { tk1 := testkit.NewTestKit(t, store) remote := tables.NewStateRemote(tk1.Session()) var leaseBefore uint64 - for i =0; i<20; i++ { + for i = 0; i < 20; i++ { time.Sleep(200 * time.Millisecond) lockType, lease, err := remote.Load(context.Background(), tbl.Meta().ID) require.NoError(t, err) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index f6c9fa027247a..757a0b3d07a18 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -15,14 +15,14 @@ package tables import ( - "strconv" "context" "fmt" + "strconv" "sync" "time" "github.com/pingcap/errors" - // "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" @@ -224,11 +224,11 @@ type stateRecord struct { lockType CachedTableLockType } -// func newMockStateRemoteData() *mockStateRemoteData { -// return &mockStateRemoteData{ -// data: make(map[int64]*stateRecord), -// } -// } +func newMockStateRemoteData() *mockStateRemoteData { + return &mockStateRemoteData{ + data: make(map[int64]*stateRecord), + } +} func (r *mockStateRemoteData) Load(tid int64) (CachedTableLockType, uint64, error) { record, ok := r.data[tid] @@ -323,7 +323,7 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e return 0, nil } -func (r *mockStateRemoteData) renewLeaseForRead(tid int64, newTs uint64) (bool, error) { +func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs uint64) (bool, error) { record, ok := r.data[tid] if !ok { record = &stateRecord{ @@ -336,6 +336,9 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, newTs uint64) (bool, if record.lockType != CachedTableLockRead { return false, errors.New("The read lock can be renewed only in the read lock state") } + if record.lockLease < oldTs { + return false, errors.New("The remote Lease is invalid") + } if record.lockLease <= newTs { record.lockLease = newTs return true, nil @@ -345,7 +348,6 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, newTs uint64) (bool, type sqlExec interface { ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) - // GetStore() kv.Storage } type stateRemoteHandle struct { @@ -379,7 +381,6 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 // The old lock is outdated, clear orphan lock. if now > lease { succ = true - fmt.Println("lock for read .... now=", now, "lease=", lease, ts) if err := h.updateRow(ctx, tid, "READ", ts); err != nil { return errors.Trace(err) } @@ -413,16 +414,7 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint if waitAndRetry == 0 { break } - - fmt.Println("lock for write sleep and retry ===", waitAndRetry) time.Sleep(waitAndRetry) - // store := h.exec.GetStore() - // o := store.GetOracle() - // newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) - // if err != nil { - // return errors.Trace(err) - // } - // now, ts = newTS, leaseFromTS(newTS) } return nil } @@ -435,7 +427,6 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts } // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { - fmt.Println("old lock outdated ... now = ", now, " lease=", lease) if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { return errors.Trace(err) } @@ -463,7 +454,6 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts // Wait for lease to expire, and then retry. waitAndRetry = waitForLeaseExpire(lease, now) - fmt.Println("lease ===", lease, "now ===", now, "wait ===", waitAndRetry) case CachedTableLockIntend: // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. if now > oldReadLease { @@ -502,8 +492,10 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease if op == RenewReadLease { err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } if now >= oldLease { - fmt.Println("now=", now, "oldLease=", oldLease, "newLease=", newLease) // read lock had already expired, fail to renew return nil } @@ -530,19 +522,16 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "begin") - fmt.Printf("EEEEEEEEEEEEEEEE begin ...%p...\n", h.exec) return err } func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "commit") - fmt.Printf("EEEEEEEEEEEEEEEE commit %v...%p ...\n", err, h.exec) return err } func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "rollback") - fmt.Println("EEEEEEEEEEEEEEEE rollback") return err } @@ -564,8 +553,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co err = fn(ctx, txnTS) if err != nil { - h.rollbackTxn(ctx) - fmt.Println("==== rollback exec error ===", err) + terror.Log(h.rollbackTxn(ctx)) return errors.Trace(err) } diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index c27b952573452..00f280cad2028 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -15,16 +15,16 @@ package tables_test import ( - "time" "context" "testing" + "time" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/table/tables" - "github.com/tikv/client-go/v2/oracle" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) // CreateMetaLockForCachedTable initializes the cached table meta lock information. @@ -70,7 +70,7 @@ func TestStateRemote(t *testing.T) { ts, err := se.GetStore().GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) require.NoError(t, err) physicalTime := oracle.GetTimeFromTS(ts) - leaseVal := oracle.GoTimeToTS(physicalTime.Add(200*time.Millisecond)) + leaseVal := oracle.GoTimeToTS(physicalTime.Add(200 * time.Millisecond)) // Check read lock. succ, err := h.LockForRead(ctx, 5, leaseVal) @@ -84,17 +84,17 @@ func TestStateRemote(t *testing.T) { // LockForRead when read lock is hold. // This operation equals to renew lease. - succ, err = h.LockForRead(ctx, 5, leaseVal + 1) + succ, err = h.LockForRead(ctx, 5, leaseVal+1) require.NoError(t, err) require.True(t, succ) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) require.Equal(t, lockType.String(), "READ") - require.Equal(t, lease, leaseVal + 1) + require.Equal(t, lease, leaseVal+1) // Renew read lock lease operation. - leaseVal = oracle.GoTimeToTS(physicalTime.Add(400*time.Millisecond)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(400 * time.Millisecond)) succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) require.NoError(t, err) require.True(t, succ) @@ -105,7 +105,7 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - leaseVal = oracle.GoTimeToTS(physicalTime.Add(700*time.Millisecond)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) @@ -114,13 +114,12 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // // Lock for write again - leaseVal = oracle.GoTimeToTS(physicalTime.Add(800*time.Millisecond)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) - lockType, lease, err = h.Load(ctx, 5) + lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - // require.Equal(t, lease, leaseVal) // Renew read lock lease should fail when the write lock is hold. succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) @@ -134,7 +133,7 @@ func TestStateRemote(t *testing.T) { // But clear orphan write lock should success. time.Sleep(time.Second) - leaseVal = oracle.GoTimeToTS(physicalTime.Add(2*time.Second)) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) From 56cbd37b24d3d846b9c817343fe2c188dfeae9fa Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 23 Nov 2021 21:02:56 +0800 Subject: [PATCH 04/13] fix CI --- table/tables/state_remote.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 757a0b3d07a18..9e9cf5eee64e5 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -120,10 +120,10 @@ func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now return op.err } -func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { switch op { case RenewReadLease: - op := &renewLeaseForReadOP{tid: tid, newTs: newTs} + op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} op.Add(1) r.ch <- op op.Wait() @@ -202,6 +202,7 @@ type renewLeaseForReadOP struct { sync.WaitGroup // Input tid int64 + oldTs uint64 newTs uint64 // Output @@ -210,7 +211,7 @@ type renewLeaseForReadOP struct { } func (op *renewLeaseForReadOP) Exec(r *mockStateRemoteData) { - op.succ, op.err = r.renewLeaseForRead(op.tid, op.newTs) + op.succ, op.err = r.renewLeaseForRead(op.tid, op.oldTs, op.newTs) op.Done() } From d579c585b9f1250d3a40b2f6a222c6b23224a01d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 29 Nov 2021 10:26:10 +0800 Subject: [PATCH 05/13] *: implement renew write lease for cached table --- executor/infoschema_reader_test.go | 4 +- go.mod | 2 + kv/option.go | 3 + session/session.go | 74 +++++++++++++++++++++++ sessionctx/variable/session.go | 3 + store/driver/txn/txn_driver.go | 2 + table/tables/cache.go | 44 +++++--------- table/tables/state_remote.go | 96 ++++++++++++++++++++---------- table/tables/state_remote_test.go | 39 +++++------- 9 files changed, 181 insertions(+), 86 deletions(-) diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index c92ce7da52495..0744372305cb6 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -944,14 +944,14 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27")) c.Assert(tk.Se.Auth(&auth.UserIdentity{ Username: "testuser3", Hostname: "localhost", }, nil, nil), Equals, true) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("26")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("27")) } func (s *testInfoschemaTableSuite) TestSequences(c *C) { diff --git a/go.mod b/go.mod index fe719777fb6cf..cb69f3d294a15 100644 --- a/go.mod +++ b/go.mod @@ -102,3 +102,5 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible + +replace github.com/tikv/client-go/v2 => ../../tikv/client-go diff --git a/kv/option.go b/kv/option.go index 682c2be4f2d60..be535e00dd658 100644 --- a/kv/option.go +++ b/kv/option.go @@ -68,6 +68,9 @@ const ( KVFilter // SnapInterceptor is used for setting the interceptor for snapshot SnapInterceptor + // CachedTableWriteLease is the write lock lease for cached table, the write lock protect the other TiDB from using cache. + // The transaction commit ts must be greater than the write lock lease value. + CachedTableWriteLease ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index e53c3fe5b4e38..97e3962a124f3 100644 --- a/session/session.go +++ b/session/session.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tipb/go-binlog" @@ -90,6 +91,7 @@ import ( "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" tikvstore "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" ) @@ -558,10 +560,82 @@ func (s *session) doCommit(ctx context.Context) error { if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 { s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) } + if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { + c := cachedTableRenewLease{exit: make(chan struct{}), tables: tables} + err := c.start(ctx) + defer c.stop(ctx) + if err != nil { + return errors.Trace(err) + } + s.txn.SetOption(kv.CachedTableWriteLease, &c.lease) + } return s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn) } +type cachedTableRenewLease struct { + lease uint64 + exit chan struct{} + tables map[int64]interface{} +} + +func (c *cachedTableRenewLease) start(ctx context.Context) error { + var maxLease uint64 + for tid, raw := range c.tables { + handle := raw.(tables.StateRemote) + writeLockLease, err := handle.LockForWrite(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if writeLockLease > maxLease { + maxLease = writeLockLease + } + go c.keepAlive(ctx) + } + atomic.StoreUint64(&c.lease, maxLease) + return nil +} + +func (c *cachedTableRenewLease) keepAlive(ctx context.Context) { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for { + select { + case <-t.C: + if err := c.renew(ctx); err != nil { + logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", + zap.Error(err)) + return + } + case <-c.exit: + return + } + } +} + +func (c *cachedTableRenewLease) renew(ctx context.Context) error { + newLease := atomic.LoadUint64(&c.lease) + physicalTime := oracle.GetTimeFromTS(newLease) + newLease = oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) + for tid, raw := range c.tables { + handle := raw.(tables.StateRemote) + succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease) + if err != nil { + return errors.Trace(err) + } + if !succ { + // Don't update the new lease unless all the operation success. + return nil + } + } + atomic.StoreUint64(&c.lease, newLease) + return nil +} + +func (c *cachedTableRenewLease) stop(ctx context.Context) { + close(c.exit) +} + func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c395d98c85356..5cc8e8369ed92 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -178,6 +178,9 @@ type TransactionContext struct { // TemporaryTables is used to store transaction-specific information for global temporary tables. // It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends. TemporaryTables map[int64]tableutil.TempTable + + // CachedTables is not nil if the transaction write on cached table. + CachedTables map[int64]interface{} } // GetShard returns the shard prefix for the next `count` rowids. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 0c08e9b3f65db..8c77b1941e4c4 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -227,6 +227,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) case kv.SnapInterceptor: txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) + case kv.CachedTableWriteLease: + txn.KVTxn.SetCachedTableWriteLease(val.(*uint64)) } } diff --git a/table/tables/cache.go b/table/tables/cache.go index 975eaad78ac56..5c5306c0d981c 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,45 +183,31 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. -func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txn, err := ctx.Txn(true) - if err != nil { - return nil, err +func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.AddRecord(sctx, r, opts...) +} + +func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.CachedTables == nil { + txnCtx.CachedTables = make(map[int64]interface{}) } - now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return nil, errors.Trace(err) + if _, ok := txnCtx.CachedTables[tid]; !ok { + txnCtx.CachedTables[tid] = handle } - return c.TableCommon.AddRecord(ctx, r, opts...) } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - txn, err := sctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. -func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txn, err := ctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - return c.TableCommon.RemoveRecord(ctx, h, r) +func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.RemoveRecord(sctx, h, r) } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 9e9cf5eee64e5..a62f347c983f3 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -16,7 +16,6 @@ package tables import ( "context" - "fmt" "strconv" "sync" "time" @@ -65,7 +64,7 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64, lease uint64) error + LockForWrite(ctx context.Context, tid int64) (uint64, error) // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -404,28 +403,32 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { +// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { h.Lock() defer h.Unlock() + var ret uint64 for { - waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) + waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) if err != nil { - return err + return 0, err } if waitAndRetry == 0 { + ret = lease break } time.Sleep(waitAndRetry) } - return nil + return ret, nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } + ts = leaseFromTS(now) // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -489,36 +492,69 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() + switch op { + case RenewReadLease: + return h.renewReadLease(ctx, tid, newLease) + case RenewWriteLease: + return h.renewWriteLease(ctx, tid, newLease) + } + return false, errors.New("wrong renew lease type") +} + +func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - if op == RenewReadLease { - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) if err != nil { return errors.Trace(err) } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } + } + succ = true + return nil + }) + return succ, err +} - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true +func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { + var succ bool + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // write lock had already expired, fail to renew return nil - }) - return succ, err - } + } + if lockType != CachedTableLockWrite { + // Not write lock, fail to renew + return nil + } - // TODO: renew for write lease - return false, errors.New("not implement yet") + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "WRITE", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index 00f280cad2028..2f25f937dc379 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -27,18 +27,6 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -// CreateMetaLockForCachedTable initializes the cached table meta lock information. -func createMetaLockForCachedTable(h session.Session) error { - createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + - "`tid` int(11) NOT NULL DEFAULT 0," + - "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + - "`lease` bigint(20) NOT NULL DEFAULT 0," + - "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + - "PRIMARY KEY (`tid`))" - _, err := h.ExecuteInternal(context.Background(), createTable) - return err -} - // InitRow add a new record into the cached table meta lock table. func initRow(ctx context.Context, exec session.Session, tid int) error { _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) @@ -55,9 +43,6 @@ func TestStateRemote(t *testing.T) { ctx := context.Background() h := tables.NewStateRemote(se) - err := createMetaLockForCachedTable(se) - require.NoError(t, err) - require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) // Check the initial value. require.NoError(t, initRow(ctx, se, 5)) @@ -105,17 +90,17 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) - require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + writeLease, err := h.LockForWrite(ctx, 5) + require.NoError(t, err) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, leaseVal) + require.Greater(t, writeLease, leaseVal) - // // Lock for write again - leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) - require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + // Lock for write again + writeLease, err = h.LockForWrite(ctx, 5) + require.NoError(t, err) lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) @@ -131,10 +116,14 @@ func TestStateRemote(t *testing.T) { require.NoError(t, err) require.False(t, succ) - // But clear orphan write lock should success. - time.Sleep(time.Second) - leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) - succ, err = h.LockForRead(ctx, 5, leaseVal) + // Renew write lease. + succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease) require.NoError(t, err) require.True(t, succ) + + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, writeLease+1) } From 3246ed724132c5cf5087f52260ee6b7899d0a70c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 10 Dec 2021 21:10:55 +0800 Subject: [PATCH 06/13] clean up code --- go.mod | 2 +- go.sum | 2 + table/tables/state_remote.go | 139 ----------------------------------- 3 files changed, 3 insertions(+), 140 deletions(-) diff --git a/go.mod b/go.mod index 65c1b291124c4..2627db99d4a9d 100644 --- a/go.mod +++ b/go.mod @@ -103,4 +103,4 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible -replace github.com/tikv/client-go/v2 => ../../tikv/client-go +replace github.com/tikv/client-go/v2 => github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae diff --git a/go.sum b/go.sum index 1ab204f14388d..c7ae5d6bec242 100644 --- a/go.sum +++ b/go.sum @@ -707,6 +707,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJ github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= +github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae h1:rje74mRxCOOe/Nbxr3VE55dm8622Rs4U++FIH8QPfh4= +github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index f3377e8f31833..5086b6bf37d34 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -360,144 +360,5 @@ func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...int if rs != nil { return sqlexec.DrainRecordSet(ctx, rs, 1) } -// <<<<<<< HEAD -// return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") -// } - -// type sqlExec interface { -// ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) -// } - -// type stateRemoteHandle struct { -// exec sqlExec -// sync.Mutex -// } - -// // NewStateRemote creates a StateRemote object. -// func NewStateRemote(exec sqlExec) *stateRemoteHandle { -// return &stateRemoteHandle{ -// exec: exec, -// } -// } - -// var _ StateRemote = &stateRemoteHandle{} - -// func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { -// lockType, lease, _, err := h.loadRow(ctx, tid) -// return lockType, lease, err -// } - -// func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint64) ( /*succ*/ bool, error) { -// h.Lock() -// defer h.Unlock() -// succ := false -// err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { -// lockType, lease, _, err := h.loadRow(ctx, tid) -// if err != nil { -// return errors.Trace(err) -// } -// // The old lock is outdated, clear orphan lock. -// if now > lease { -// succ = true -// if err := h.updateRow(ctx, tid, "READ", ts); err != nil { -// return errors.Trace(err) -// } -// return nil -// } - -// switch lockType { -// case CachedTableLockNone: -// case CachedTableLockRead: -// case CachedTableLockWrite, CachedTableLockIntend: -// return nil -// } -// succ = true -// if err := h.updateRow(ctx, tid, "READ", ts); err != nil { -// return errors.Trace(err) -// } - -// return nil -// }) -// return succ, err -// } - - - - - -// func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { -// _, err := h.execSQL(ctx, "begin") -// return err -// } - -// func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { -// _, err := h.execSQL(ctx, "commit") -// return err -// } - -// func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { -// _, err := h.execSQL(ctx, "rollback") -// return err -// } - -// func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context, txnTS uint64) error) error { -// err := h.beginTxn(ctx) -// if err != nil { -// return errors.Trace(err) -// } - -// rows, err := h.execSQL(ctx, "select @@tidb_current_ts") -// if err != nil { -// return errors.Trace(err) -// } -// resultStr := rows[0].GetString(0) -// txnTS, err := strconv.ParseUint(resultStr, 10, 64) -// if err != nil { -// return errors.Trace(err) -// } - -// err = fn(ctx, txnTS) -// if err != nil { -// terror.Log(h.rollbackTxn(ctx)) -// return errors.Trace(err) -// } - -// return h.commitTxn(ctx) -// } - -// func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { -// chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) -// if err != nil { -// return 0, 0, 0, errors.Trace(err) -// } -// if len(chunkRows) != 1 { -// return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) -// } -// col1 := chunkRows[0].GetEnum(0) -// // Note, the MySQL enum value start from 1 rather than 0 -// lockType := CachedTableLockType(col1.Value - 1) -// lease := chunkRows[0].GetUint64(1) -// oldReadLease := chunkRows[0].GetUint64(2) -// return lockType, lease, oldReadLease, nil -// } - -// func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error { -// _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid) -// return err -// } - -// func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { -// rs, err := h.exec.ExecuteInternal(ctx, sql, args...) -// if rs != nil { -// defer rs.Close() -// } -// if err != nil { -// return nil, errors.Trace(err) -// } -// if rs != nil { -// return sqlexec.DrainRecordSet(ctx, rs, 1) -// } -// ======= -// >>>>>>> master return nil, nil } From 6e247228da33fa920c5da1fa06e134fd84870701 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 11 Dec 2021 09:49:08 +0800 Subject: [PATCH 07/13] update lease for write on multiple cached tables --- go.mod | 2 +- go.sum | 2 + session/session.go | 77 ++++++++++++++++++---------------- store/driver/txn/txn_driver.go | 2 +- table/tables/state_remote.go | 2 - 5 files changed, 46 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 2627db99d4a9d..fbed13d84616b 100644 --- a/go.mod +++ b/go.mod @@ -103,4 +103,4 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible -replace github.com/tikv/client-go/v2 => github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae +replace github.com/tikv/client-go/v2 => github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211211014557-6e8f369b632c diff --git a/go.sum b/go.sum index c7ae5d6bec242..d6fb16b08d8e9 100644 --- a/go.sum +++ b/go.sum @@ -709,6 +709,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae h1:rje74mRxCOOe/Nbxr3VE55dm8622Rs4U++FIH8QPfh4= github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211210124541-6f2bc98ad5ae/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211211014557-6e8f369b632c h1:YCZvDxjMZn5hChfbaHB6Tj1ifvUc2YqlneXoesi2GC0= +github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211211014557-6e8f369b632c/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/session/session.go b/session/session.go index bb5c46bc553cf..8bb0f64f2ca94 100644 --- a/session/session.go +++ b/session/session.go @@ -561,52 +561,63 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) } if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { - c := cachedTableRenewLease{exit: make(chan struct{}), tables: tables} + c := cachedTableRenewLease{tables: tables} now := time.Now() err := c.start(ctx) - sessVars.StmtCtx.WaitLockLeaseTime += time.Since(now) defer c.stop(ctx) + sessVars.StmtCtx.WaitLockLeaseTime += time.Since(now) if err != nil { return errors.Trace(err) } - s.txn.SetOption(kv.CachedTableWriteLease, &c.lease) + s.txn.SetOption(kv.CachedTableWriteLease, c.lease) } return s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn) } type cachedTableRenewLease struct { - lease uint64 - exit chan struct{} tables map[int64]interface{} + lease []uint64 // Lease for each visited cached tables. + exit chan struct{} } func (c *cachedTableRenewLease) start(ctx context.Context) error { - var maxLease uint64 + c.exit = make(chan struct{}) + c.lease = make([]uint64, len(c.tables)) + wg := make(chan error) + ith := 0 for tid, raw := range c.tables { - handle := raw.(tables.StateRemote) - writeLockLease, err := handle.LockForWrite(ctx, tid) - if err != nil { - return errors.Trace(err) - } - if writeLockLease > maxLease { - maxLease = writeLockLease + go c.keepAlive(ctx, wg, raw.(tables.StateRemote), tid, &c.lease[ith]) + ith++ + } + + // Wait for all LockForWrite() return, this function can return. + var err error + for ; ith > 0; ith-- { + tmp := <-wg + if tmp != nil { + err = tmp } - go c.keepAlive(ctx) } - atomic.StoreUint64(&c.lease, maxLease) - return nil + return err } -func (c *cachedTableRenewLease) keepAlive(ctx context.Context) { +func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, handle tables.StateRemote, tid int64, leasePtr *uint64) { + writeLockLease, err := handle.LockForWrite(ctx, tid) + atomic.StoreUint64(leasePtr, writeLockLease) + wg <- err + if err != nil { + logutil.Logger(ctx).Warn("[cached table] lock for write lock fail", zap.Error(err)) + return + } + t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-t.C: - if err := c.renew(ctx); err != nil { - logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", - zap.Error(err)) + if err := c.renew(ctx, handle, tid, leasePtr); err != nil { + logutil.Logger(ctx).Warn("[cached table] renew write lock lease fail", zap.Error(err)) return } case <-c.exit: @@ -615,22 +626,18 @@ func (c *cachedTableRenewLease) keepAlive(ctx context.Context) { } } -func (c *cachedTableRenewLease) renew(ctx context.Context) error { - newLease := atomic.LoadUint64(&c.lease) - physicalTime := oracle.GetTimeFromTS(newLease) - newLease = oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) - for tid, raw := range c.tables { - handle := raw.(tables.StateRemote) - succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease) - if err != nil { - return errors.Trace(err) - } - if !succ { - // Don't update the new lease unless all the operation success. - return nil - } +func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRemote, tid int64, leasePtr *uint64) error { + oldLease := atomic.LoadUint64(leasePtr) + physicalTime := oracle.GetTimeFromTS(oldLease) + newLease := oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) + + succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease) + if err != nil { + return errors.Trace(err) + } + if succ { + atomic.StoreUint64(leasePtr, newLease) } - atomic.StoreUint64(&c.lease, newLease) return nil } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 237ba500c8ee0..dca2d19494a80 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -231,7 +231,7 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { case kv.SnapInterceptor: txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) case kv.CachedTableWriteLease: - txn.KVTxn.SetCachedTableWriteLease(val.(*uint64)) + txn.KVTxn.SetCachedTableWriteLease(val.([]uint64)) } } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5086b6bf37d34..30027537d7494 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -132,7 +132,6 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } - // LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { h.Lock() @@ -218,7 +217,6 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { return 0 } - func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease uint64, op RenewLeaseType) (bool, error) { h.Lock() defer h.Unlock() From 0df26a2e354a024085bd9796a41439ca33983426 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 11 Dec 2021 10:14:02 +0800 Subject: [PATCH 08/13] add a test case for write on multiple cached table --- session/session_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/session/session_test.go b/session/session_test.go index 130aa895c01f2..95440ca7583bf 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5889,3 +5889,35 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { " `cs1` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } + +func (s *testSessionSuite) TestWriteOnMultipleCachedTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists ct1, ct2") + tk.MustExec("create table ct1 (id int, c int)") + tk.MustExec("create table ct2 (id int, c int)") + tk.MustExec("alter table ct1 cache") + tk.MustExec("alter table ct2 cache") + tk.MustQuery("select * from ct1").Check(testkit.Rows()) + tk.MustQuery("select * from ct2").Check(testkit.Rows()) + + cached := false + for i :=0; i<50; i++ { + if tk.HasPlan("select * from ct1", "Union") { + if tk.HasPlan("select * from ct2", "Union") { + cached = true + break + } + } + time.Sleep(100*time.Millisecond) + } + c.Assert(cached, IsTrue) + + tk.MustExec("begin") + tk.MustExec("insert into ct1 values (3, 4)") + tk.MustExec("insert into ct2 values (5, 6)") + tk.MustExec("commit") + + tk.MustQuery("select * from ct1").Check(testkit.Rows("3 4")) + tk.MustQuery("select * from ct2").Check(testkit.Rows("5 6")) +} From fc708df7067c5f15d3ba2c1cd04efd10878dd39a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 14 Dec 2021 09:25:53 +0800 Subject: [PATCH 09/13] address comment --- session/session.go | 6 ++++-- session/session_test.go | 4 ++-- table/tables/state_remote.go | 3 --- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/session/session.go b/session/session.go index 8bb0f64f2ca94..c7a8e27927c2c 100644 --- a/session/session.go +++ b/session/session.go @@ -602,6 +602,8 @@ func (c *cachedTableRenewLease) start(ctx context.Context) error { return err } +const cacheTableWriteLease = 5 * time.Second + func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, handle tables.StateRemote, tid int64, leasePtr *uint64) { writeLockLease, err := handle.LockForWrite(ctx, tid) atomic.StoreUint64(leasePtr, writeLockLease) @@ -611,7 +613,7 @@ func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, ha return } - t := time.NewTicker(5 * time.Second) + t := time.NewTicker(cacheTableWriteLease) defer t.Stop() for { select { @@ -629,7 +631,7 @@ func (c *cachedTableRenewLease) keepAlive(ctx context.Context, wg chan error, ha func (c *cachedTableRenewLease) renew(ctx context.Context, handle tables.StateRemote, tid int64, leasePtr *uint64) error { oldLease := atomic.LoadUint64(leasePtr) physicalTime := oracle.GetTimeFromTS(oldLease) - newLease := oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) + newLease := oracle.GoTimeToTS(physicalTime.Add(cacheTableWriteLease)) succ, err := handle.RenewLease(ctx, tid, newLease, tables.RenewWriteLease) if err != nil { diff --git a/session/session_test.go b/session/session_test.go index 95440ca7583bf..ba77b4f46d7e0 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5902,14 +5902,14 @@ func (s *testSessionSuite) TestWriteOnMultipleCachedTable(c *C) { tk.MustQuery("select * from ct2").Check(testkit.Rows()) cached := false - for i :=0; i<50; i++ { + for i := 0; i < 50; i++ { if tk.HasPlan("select * from ct1", "Union") { if tk.HasPlan("select * from ct2", "Union") { cached = true break } } - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) } c.Assert(cached, IsTrue) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 30027537d7494..aeddd5b972ab2 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -181,9 +181,6 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (wa tid); err != nil { return errors.Trace(err) } - if err = h.commitTxn(ctx); err != nil { - return errors.Trace(err) - } // Wait for lease to expire, and then retry. waitAndRetry = waitForLeaseExpire(lease, now) From faf1d964515039887f813f5c2ee74c30c2ec895b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 14 Dec 2021 16:31:46 +0800 Subject: [PATCH 10/13] update the SetOption name --- go.mod | 2 +- kv/option.go | 6 +++--- session/session.go | 12 +++++++++++- store/driver/txn/txn_driver.go | 4 ++-- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index fbed13d84616b..65c1b291124c4 100644 --- a/go.mod +++ b/go.mod @@ -103,4 +103,4 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible -replace github.com/tikv/client-go/v2 => github.com/tiancaiamao/client-go/v2 v2.0.0-alpha.0.20211211014557-6e8f369b632c +replace github.com/tikv/client-go/v2 => ../../tikv/client-go diff --git a/kv/option.go b/kv/option.go index 2b725496b0571..2a7a17fedcb6c 100644 --- a/kv/option.go +++ b/kv/option.go @@ -71,9 +71,9 @@ const ( // SnapInterceptor is used for setting the interceptor for snapshot SnapInterceptor - // CachedTableWriteLease is the write lock lease for cached table, the write lock protect the other TiDB from using cache. - // The transaction commit ts must be greater than the write lock lease value. - CachedTableWriteLease + // CommitTSUpperBoundChec is used by cached table + // The commitTS must be greater than all the write lock lease of the visited cached table. + CommitTSUpperBoundCheck ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index c7a8e27927c2c..50f569a4f750a 100644 --- a/session/session.go +++ b/session/session.go @@ -569,7 +569,7 @@ func (s *session) doCommit(ctx context.Context) error { if err != nil { return errors.Trace(err) } - s.txn.SetOption(kv.CachedTableWriteLease, c.lease) + s.txn.SetOption(kv.CommitTSUpperBoundCheck, c.commitTSCheck) } return s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn) @@ -647,6 +647,16 @@ func (c *cachedTableRenewLease) stop(ctx context.Context) { close(c.exit) } +func (c *cachedTableRenewLease) commitTSCheck(commitTS uint64) bool { + for i := 0; i < len(c.lease); i++ { + lease := atomic.LoadUint64(&c.lease[i]) + if commitTS >= lease { + return false + } + } + return true +} + func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index dca2d19494a80..717bf3b154761 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -230,8 +230,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) case kv.SnapInterceptor: txn.snapshotInterceptor = val.(kv.SnapshotInterceptor) - case kv.CachedTableWriteLease: - txn.KVTxn.SetCachedTableWriteLease(val.([]uint64)) + case kv.CommitTSUpperBoundCheck: + txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool)) } } From 8a5baeabd55a5156a221772b484f6fa03b8c4f75 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 15 Dec 2021 10:55:12 +0800 Subject: [PATCH 11/13] address comment --- go.mod | 4 +--- go.sum | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 65c1b291124c4..7f9340201aad6 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440 + github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible @@ -102,5 +102,3 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible - -replace github.com/tikv/client-go/v2 => ../../tikv/client-go diff --git a/go.sum b/go.sum index d6fb16b08d8e9..9fd7e6293040f 100644 --- a/go.sum +++ b/go.sum @@ -716,6 +716,8 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440 h1:XHRkMms0v6uxUZqErwZbmAs7baVVyNcOC1oOSz+BGgc= github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= From 1bf7acc2559a6ea63eb48f9411fade561f85e3b3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 15 Dec 2021 11:19:00 +0800 Subject: [PATCH 12/13] make golint happy --- table/tables/state_remote_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index 3129178bca3dd..dc4e9272b1830 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -95,6 +95,7 @@ func TestStateRemote(t *testing.T) { require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, writeLease, lease) require.Greater(t, writeLease, leaseVal) // Lock for write again From 14ec518b6cf98c766d1b56ce92b1ef111d3245c3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 16 Dec 2021 10:33:27 +0800 Subject: [PATCH 13/13] Update session/session.go Co-authored-by: tangenta --- session/session.go | 1 + 1 file changed, 1 insertion(+) diff --git a/session/session.go b/session/session.go index 09440ec19aabe..8e4c0ea46404e 100644 --- a/session/session.go +++ b/session/session.go @@ -651,6 +651,7 @@ func (c *cachedTableRenewLease) commitTSCheck(commitTS uint64) bool { for i := 0; i < len(c.lease); i++ { lease := atomic.LoadUint64(&c.lease[i]) if commitTS >= lease { + // Txn fails to commit because the write lease is expired. return false } }