From 7dd848e3637c203562e98e9bc2b40a19cb83a57b Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Tue, 16 Nov 2021 17:21:11 +0800 Subject: [PATCH 01/19] draft_renew_read --- table/table.go | 4 ++- table/tables/cache.go | 55 ++++++++++++++++++++++++++++++++++- table/tables/cache_test.go | 56 ++++++++++++++++++++++++++++++++++++ table/tables/state_remote.go | 54 ++++++++++++++++++++++++++++++++-- table/tables/tables.go | 7 +++++ 5 files changed, 171 insertions(+), 5 deletions(-) diff --git a/table/table.go b/table/table.go index f39e9b2d9fa8d..19698912f708d 100644 --- a/table/table.go +++ b/table/table.go @@ -20,7 +20,6 @@ package table import ( "context" - "github.com/opentracing/opentracing-go" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" @@ -258,4 +257,7 @@ type CachedTable interface { // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table UpdateLockForRead(store kv.Storage, ts uint64) error + + // MockGetDataLease only for test renew Lease + MockGetDataLease() (uint64, uint64) } diff --git a/table/tables/cache.go b/table/tables/cache.go index ba8786a65bae3..0b92a33dba5ab 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" @@ -29,15 +30,32 @@ import ( "github.com/tikv/client-go/v2/tikv" ) +// RenewLeaseType define the type for renew lease. +type RenewLeaseType int + +const ( + // RenewReadLease means renew read lease. + RenewReadLease RenewLeaseType = iota + 1 + // RenewWriteLease means renew write lease. + RenewWriteLease +) + var ( _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) +type renewInfo struct { + ts uint64 + *cacheData + op RenewLeaseType +} + type cachedTable struct { TableCommon cacheData atomic.Value handle StateRemote + renewCh chan renewInfo } // cacheData pack the cache data and lease. @@ -72,6 +90,13 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { } data := tmp.(*cacheData) if ts >= data.Start && ts < data.Lease { + startTime := oracle.GetTimeFromTS(data.Start) + nowTime := oracle.GetTimeFromTS(ts) + after := nowTime.Sub(startTime) + // TODO make this configurable in the following PRs + if after >= (2 * time.Second) { + c.renewCh <- renewInfo{ts: ts, cacheData: data, op: RenewReadLease} + } return data } return nil @@ -92,8 +117,9 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, handle: &mockStateRemoteHandle{mockStateRemote.Ch}, + renewCh: make(chan renewInfo, 10), } - + go ret.renewLease() return ret, nil } @@ -203,3 +229,30 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type } return c.TableCommon.RemoveRecord(ctx, h, r) } + +func (c *cachedTable) renewLease() { + for renewInfo := range c.renewCh { + tid := c.Meta().ID + lease := leaseFromTS(renewInfo.ts) + succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) + if err != nil { + log.Warn("Renew read lease error") + } + if succ { + c.cacheData.Store(&cacheData{ + Start: renewInfo.ts, + Lease: lease, + MemBuffer: renewInfo.cacheData, + }) + } + } +} + +func (c *cachedTable) MockGetDataLease() (uint64, uint64) { + tmp := c.cacheData.Load() + if tmp == nil { + return 0, 0 + } + data := tmp.(*cacheData) + return data.Start, data.Lease +} diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e1c55ab3d97af..73e88edbbbd4c 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,11 +15,16 @@ package tables_test import ( + "context" "testing" "time" + "github.com/pingcap/tidb/parser/model" + table2 "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func TestCacheTableBasicScan(t *testing.T) { @@ -389,3 +394,54 @@ func TestCacheTableBatchPointGet(t *testing.T) { tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 4)").Check(testkit.Rows("1 11 101")) tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101")) } + +func TestRenewLease(t *testing.T) { + // Test RenewLeaseForRead + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + table := &model.TableInfo{ + ID: 1, + Name: model.NewCIStr("t"), + Charset: "utf8", + Collate: "utf8_bin", + TableCacheStatusType: model.TableCacheStatusEnable, + PKIsHandle: true, + } + ctx := context.Background() + tbl := tables.MockTableFromMeta(table) + cacheTable := tbl.(table2.CachedTable) + err := se.NewTxn(ctx) + require.NoError(t, err) + txn, err := se.Txn(true) + require.NoError(t, err) + data := cacheTable.TryReadFromCache(txn.StartTS()) + require.Equal(t, data, nil) + for { + err := se.NewTxn(ctx) + require.NoError(t, err) + txn, err := se.Txn(true) + err = cacheTable.UpdateLockForRead(se.GetStore(), txn.StartTS()) + if err == nil { + break + } + } + startTs, _ := cacheTable.MockGetDataLease() + physicalTime := oracle.GetTimeFromTS(startTs) + lease := oracle.GoTimeToTS(physicalTime.Add(2*time.Second + 500*time.Millisecond)) + data = cacheTable.TryReadFromCache(lease) + require.NotNil(t, data) + physicalTime = oracle.GetTimeFromTS(lease) + newLease := oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) + var i int + for i = 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + _, lease := cacheTable.MockGetDataLease() + if lease == newLease { + break + } + } + require.True(t, i < 10) +} diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5ef43271955b7..344e31157a516 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -48,8 +48,8 @@ type StateRemote interface { // LockForWrite try to add a write lock to the table with the specified tableID LockForWrite(tid int64, now, ts uint64) error - // RenewLease attempt to renew the read lock on the table with the specified tableID - RenewLease(tid int64, ts uint64) (bool, error) + // RenewLease attempt to renew the read / write lock on the table with the specified tableID + RenewLease(tid int64, ts uint64, op RenewLeaseType) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -101,7 +101,17 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { return op.err } -func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64, op RenewLeaseType) (bool, error) { + switch op { + case RenewReadLease: + op := &renewLeaseForReadOP{tid: tid, ts: ts} + op.Add(1) + r.ch <- op + op.Wait() + return op.succ, op.err + case RenewWriteLease: + // TODO : Renew Write Lease will implement in next pr. + } return false, errors.New("not implemented yet") } @@ -168,6 +178,23 @@ func (op *lockForWriteOP) Exec(data *mockStateRemoteData) { op.Done() } +// renewForReadOP is a kind of remote task +type renewLeaseForReadOP struct { + sync.WaitGroup + // Input + tid int64 + ts uint64 + + // Output + succ bool + err error +} + +func (op *renewLeaseForReadOP) Exec(r *mockStateRemoteData) { + op.succ, op.err = r.renewLeaseForRead(op.tid, op.ts) + op.Done() +} + type mockStateRemoteData struct { data map[int64]*stateRecord } @@ -276,3 +303,24 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e } return 0, nil } + +func (r *mockStateRemoteData) renewLeaseForRead(tid int64, ts uint64) (bool, error) { + record, ok := r.data[tid] + if !ok { + record = &stateRecord{ + lockLease: ts, + oldReadLease: ts, + lockType: CachedTableLockRead, + } + r.data[tid] = record + return true, nil + } + if record.lockType != CachedTableLockRead { + return false, errors.New("The read lock can be renewed only in the read lock state") + } + if record.lockLease > ts { + return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") + } + record.lockLease = ts + return true, nil +} diff --git a/table/tables/tables.go b/table/tables/tables.go index ecb7e23137745..5adf626ca2f78 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -84,6 +84,13 @@ func MockTableFromMeta(tblInfo *model.TableInfo) table.Table { var t TableCommon initTableCommon(&t, tblInfo, tblInfo.ID, columns, nil) + if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { + ret, err := NewCachedTable(&t) + if err != nil { + return nil + } + return ret + } if tblInfo.GetPartitionInfo() == nil { if err := initTableIndices(&t); err != nil { return nil From 332c11a3080d54645a2dbba3ac97c840bdec0c84 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Wed, 17 Nov 2021 11:37:06 +0800 Subject: [PATCH 02/19] bug_leak --- domain/domain.go | 7 +++++++ infoschema/builder.go | 14 ++++++++++++++ infoschema/infoschema.go | 12 ++++++++++++ table/table.go | 4 ++++ table/tables/cache.go | 9 +++++++-- table/tables/cache_test.go | 1 + 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 8ed2c18e58cd4..8963585ae0d90 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -287,6 +287,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 actions = append(actions, uint64(1< Date: Wed, 17 Nov 2021 21:03:32 +0800 Subject: [PATCH 03/19] bug --- domain/domain.go | 8 ++------ infoschema/builder.go | 13 +------------ infoschema/infoschema.go | 24 ++++++++++++++---------- table/table.go | 2 -- table/tables/cache.go | 16 ++++++++++++---- table/tables/cache_test.go | 2 +- 6 files changed, 30 insertions(+), 35 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 8963585ae0d90..6b7f152d298cd 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -680,15 +680,12 @@ func (do *Domain) Close() { do.wg.Wait() do.sysSessionPool.Close() variable.UnregisterStatistics(do.bindHandle) + do.InfoSchema().CloseRenewCh() if do.onClose != nil { do.onClose() } - if do.InfoSchema().RenewChs() != nil { - for _, ch := range do.InfoSchema().RenewChs() { - *ch <- struct{}{} - } - } + logutil.BgLogger().Info("domain closed", zap.Duration("take time", time.Since(startTime))) } @@ -835,7 +832,6 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R go do.topNSlowQueryLoop() go do.infoSyncerKeeper() go do.globalConfigSyncerKeeper() - if !skipRegisterToDashboard { do.wg.Add(1) go do.topologySyncerKeeper() diff --git a/infoschema/builder.go b/infoschema/builder.go index d7456ab878a94..11c8fba1e52d0 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -443,17 +443,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i return nil, errors.Trace(err) } if tbl.Meta().TableCacheStatusType != model.TableCacheStatusDisable { - cacheTable := tbl.(table.CachedTable) - b.is.renewChMap[tblInfo.ID] = make(chan struct{}, 1) - go func() { - for { - select { - case <-b.is.renewChMap[tblInfo.ID]: - cacheTable.CloseRenewCh() - default: - } - } - }() + b.is.renewChs = append(b.is.renewChs, tblInfo.ID) } tableNames := b.is.schemaMap[dbInfo.Name.L] tableNames.tables[tblInfo.Name.L] = tbl @@ -679,7 +669,6 @@ func NewBuilder(store kv.Storage) *Builder { policyMap: map[string]*model.PolicyInfo{}, ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), - renewChMap: make(map[int64]chan struct{}), }, } } diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index bd8292ac63397..01f91f6966537 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -59,7 +59,7 @@ type InfoSchema interface { RuleBundles() []*placement.Bundle // AllPlacementPolicies returns all placement policies AllPlacementPolicies() []*model.PolicyInfo - RenewChs() []*chan struct{} + CloseRenewCh() } type sortedTables []table.Table @@ -111,7 +111,19 @@ type infoSchema struct { schemaMetaVersion int64 // - renewChMap map[int64] chan struct{} + //renewMutex sync.RWMutex + renewChs []int64 +} + +func (is *infoSchema) CloseRenewCh() { + //is.renewMutex.RLock() + //defer is.renewMutex.RUnlock() + for _, id := range is.renewChs { + tbl, ok := is.TableByID(id) + if ok { + tbl.(table.CachedTable).CloseRenewCh() + } + } } // MockInfoSchema only serves for test. @@ -321,14 +333,6 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) { return } -func (is *infoSchema) RenewChs() []*chan struct{} { - var chs []*chan struct{} - for _, ch := range is.renewChMap { - chs = append(chs, &ch) - } - return chs -} - // GetSequenceByName gets the sequence by name. func GetSequenceByName(is InfoSchema, schema, sequence model.CIStr) (util.SequenceTable, error) { tbl, err := is.TableByName(schema, sequence) diff --git a/table/table.go b/table/table.go index 8bf2662c6192d..302a0144a3117 100644 --- a/table/table.go +++ b/table/table.go @@ -261,7 +261,5 @@ type CachedTable interface { // MockGetDataLease only for test renew Lease MockGetDataLease() (uint64, uint64) - RenewLease() - CloseRenewCh() } diff --git a/table/tables/cache.go b/table/tables/cache.go index 64afca7d79a3a..28b16c0ecd0e2 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -56,6 +56,7 @@ type cachedTable struct { cacheData atomic.Value handle StateRemote renewCh chan renewInfo + stopCh chan struct{} } // cacheData pack the cache data and lease. @@ -119,8 +120,9 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) { TableCommon: *tbl, handle: &mockStateRemoteHandle{mockStateRemote.Ch}, renewCh: make(chan renewInfo, 10), + stopCh: make(chan struct{}, 1), } - go ret.RenewLease() + go ret.renewLease() return ret, nil } @@ -231,7 +233,13 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return c.TableCommon.RemoveRecord(ctx, h, r) } -func (c *cachedTable) RenewLease() { +func (c *cachedTable) renewLease() { + select { + case <-c.stopCh: + close(c.renewCh) + return + default: + } for renewInfo := range c.renewCh { tid := c.Meta().ID lease := leaseFromTS(renewInfo.ts) @@ -258,6 +266,6 @@ func (c *cachedTable) MockGetDataLease() (uint64, uint64) { return data.Start, data.Lease } -func (c *cachedTable ) CloseRenewCh() { - close(c.renewCh) +func (c *cachedTable) CloseRenewCh() { + c.stopCh <- struct{}{} } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 1a43697ddd05d..ffd14f766a7df 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -413,7 +413,6 @@ func TestRenewLease(t *testing.T) { ctx := context.Background() tbl := tables.MockTableFromMeta(table) cacheTable := tbl.(table2.CachedTable) - defer cacheTable.CloseRenewCh() err := se.NewTxn(ctx) require.NoError(t, err) txn, err := se.Txn(true) @@ -445,4 +444,5 @@ func TestRenewLease(t *testing.T) { } } require.True(t, i < 10) + cacheTable.CloseRenewCh() } From 7734a44b43025b2aed40411a6a5b829f84805067 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Thu, 18 Nov 2021 16:47:56 +0800 Subject: [PATCH 04/19] save_bug --- domain/domain.go | 41 +++++++++++++++++++++++++--- infoschema/builder.go | 12 ++++++++- infoschema/infoschema.go | 15 +++-------- table/table.go | 2 ++ table/tables/cache.go | 55 +++++++++++++++++++++----------------- table/tables/cache_test.go | 11 ++++---- 6 files changed, 92 insertions(+), 44 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 6b7f152d298cd..e83a10d2024e6 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -17,6 +17,7 @@ package domain import ( "context" "fmt" + "github.com/pingcap/tidb/table" "math/rand" "strconv" "sync" @@ -93,7 +94,8 @@ type Domain struct { serverID uint64 serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - + cachedTableIDs *[]int64 + closeCh chan struct{} onClose func() } @@ -407,7 +409,7 @@ func (do *Domain) Reload() error { // lease renew, so it must be executed despite it is cache or not do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes) - + do.cachedTableIDs = is.CachedTableIDs() lease := do.DDL().GetLease() sub := time.Since(startTime) // Reload interval is lease / 2, if load schema time elapses more than this interval, @@ -670,6 +672,7 @@ func (do *Domain) Close() { do.info.RemoveServerInfo() do.info.RemoveMinStartTS() } + do.closeCh <- struct {}{} close(do.exit) if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) @@ -678,9 +681,9 @@ func (do *Domain) Close() { do.slowQuery.Close() do.cancel() do.wg.Wait() + //close(do.closeCh) do.sysSessionPool.Close() variable.UnregisterStatistics(do.bindHandle) - do.InfoSchema().CloseRenewCh() if do.onClose != nil { do.onClose() } @@ -704,6 +707,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio indexUsageSyncLease: idxUsageSyncLease, planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, + closeCh: make(chan struct{}), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) @@ -837,6 +841,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R go do.topologySyncerKeeper() } + do.wg.Add(1) + go do.closeRenewCh() + return nil } @@ -1732,6 +1739,34 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { do.infoCache.Insert(is, 0) } +func (do *Domain) closeRenewCh() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("renew ch close") + }() + for { + select { + case <-do.closeCh: + logutil.BgLogger().Info("==delete mode ==") + for _, id := range *do.cachedTableIDs { + tbl, ok := do.InfoSchema().TableByID(id) + if ok { + tbl.(table.CachedTable).CloseRenewCh() + } + } + return + default: + logutil.BgLogger().Info("==default mode ==") + for _, id := range *do.cachedTableIDs { + tbl, ok := do.InfoSchema().TableByID(id) + if ok { + tbl.(table.CachedTable).RenewLease() + } + } + } + } +} + func init() { initByLDFlagsForGlobalKill() } diff --git a/infoschema/builder.go b/infoschema/builder.go index 11c8fba1e52d0..7d08b7e193450 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -443,7 +443,17 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i return nil, errors.Trace(err) } if tbl.Meta().TableCacheStatusType != model.TableCacheStatusDisable { - b.is.renewChs = append(b.is.renewChs, tblInfo.ID) + b.is.renewMutex.Lock() + defer b.is.renewMutex.Unlock() + isExisted := false + for i := 0; i < len(b.is.renewChs); i++ { + if b.is.renewChs[i] == tableID { + isExisted = true + } + } + if !isExisted { + b.is.renewChs = append(b.is.renewChs, tblInfo.ID) + } } tableNames := b.is.schemaMap[dbInfo.Name.L] tableNames.tables[tblInfo.Name.L] = tbl diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 01f91f6966537..3f2e00377adad 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -59,7 +59,7 @@ type InfoSchema interface { RuleBundles() []*placement.Bundle // AllPlacementPolicies returns all placement policies AllPlacementPolicies() []*model.PolicyInfo - CloseRenewCh() + CachedTableIDs() *[]int64 } type sortedTables []table.Table @@ -111,19 +111,12 @@ type infoSchema struct { schemaMetaVersion int64 // - //renewMutex sync.RWMutex + renewMutex sync.RWMutex renewChs []int64 } -func (is *infoSchema) CloseRenewCh() { - //is.renewMutex.RLock() - //defer is.renewMutex.RUnlock() - for _, id := range is.renewChs { - tbl, ok := is.TableByID(id) - if ok { - tbl.(table.CachedTable).CloseRenewCh() - } - } +func (is *infoSchema ) CachedTableIDs() *[]int64 { + return &is.renewChs } // MockInfoSchema only serves for test. diff --git a/table/table.go b/table/table.go index 302a0144a3117..2d0777917f0ed 100644 --- a/table/table.go +++ b/table/table.go @@ -262,4 +262,6 @@ type CachedTable interface { MockGetDataLease() (uint64, uint64) CloseRenewCh() + + RenewLease() } diff --git a/table/tables/cache.go b/table/tables/cache.go index 28b16c0ecd0e2..625cb2ffd2e2f 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -119,10 +119,10 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, handle: &mockStateRemoteHandle{mockStateRemote.Ch}, - renewCh: make(chan renewInfo, 10), - stopCh: make(chan struct{}, 1), + renewCh: make(chan renewInfo), + stopCh: make(chan struct{}), } - go ret.renewLease() + //go ret.renewLease() return ret, nil } @@ -233,30 +233,33 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return c.TableCommon.RemoveRecord(ctx, h, r) } -func (c *cachedTable) renewLease() { - select { - case <-c.stopCh: - close(c.renewCh) - return - default: - } - for renewInfo := range c.renewCh { - tid := c.Meta().ID - lease := leaseFromTS(renewInfo.ts) - succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) - if err != nil { - log.Warn("Renew read lease error") - } - if succ { - c.cacheData.Store(&cacheData{ - Start: renewInfo.ts, - Lease: lease, - MemBuffer: renewInfo.cacheData, - }) +func (c *cachedTable) RenewLease() { + for { + select { + case <-c.stopCh: + close(c.renewCh) + return + default: + for renewInfo := range c.renewCh { + tid := c.Meta().ID + lease := leaseFromTS(renewInfo.ts) + succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) + if err != nil { + log.Warn("Renew read lease error") + } + if succ { + c.cacheData.Store(&cacheData{ + Start: renewInfo.ts, + Lease: lease, + MemBuffer: renewInfo.cacheData, + }) + } + } } } } +// MockGetDataLease is only for test func (c *cachedTable) MockGetDataLease() (uint64, uint64) { tmp := c.cacheData.Load() if tmp == nil { @@ -267,5 +270,9 @@ func (c *cachedTable) MockGetDataLease() (uint64, uint64) { } func (c *cachedTable) CloseRenewCh() { - c.stopCh <- struct{}{} + _, ok := <-c.renewCh + if ok { + close(c.renewCh) + } + //c.stopCh <- struct{}{} } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index ffd14f766a7df..e1920c6fd0b9c 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -20,7 +20,7 @@ import ( "time" "github.com/pingcap/tidb/parser/model" - table2 "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" @@ -265,6 +265,7 @@ func TestCacheTableComplexRead(t *testing.T) { require.True(t, i < 10) tk2.MustExec("commit") doneCh <- struct{}{} + return }() <-doneCh tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") @@ -363,7 +364,7 @@ func TestCacheTableBatchPointGet(t *testing.T) { tk.MustExec("insert into bp_cache_tmp1 values(2, 12, 102)") tk.MustExec("insert into bp_cache_tmp1 values(3, 13, 103)") tk.MustExec("insert into bp_cache_tmp1 values(4, 14, 104)") - + tk.MustExec("alter table bp_cache_tmp1 cache") // check point get out transaction tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 3)").Check(testkit.Rows("1 11 101", "3 13 103")) tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 13)").Check(testkit.Rows("1 11 101", "3 13 103")) @@ -402,7 +403,7 @@ func TestRenewLease(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") se := tk.Session() - table := &model.TableInfo{ + tableInfo := &model.TableInfo{ ID: 1, Name: model.NewCIStr("t"), Charset: "utf8", @@ -411,8 +412,8 @@ func TestRenewLease(t *testing.T) { PKIsHandle: true, } ctx := context.Background() - tbl := tables.MockTableFromMeta(table) - cacheTable := tbl.(table2.CachedTable) + tbl := tables.MockTableFromMeta(tableInfo) + cacheTable := tbl.(table.CachedTable) err := se.NewTxn(ctx) require.NoError(t, err) txn, err := se.Txn(true) From 2485db3f4996748dc25cb2add6f5618f0b540c6d Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 21:37:54 +0800 Subject: [PATCH 05/19] draft_1 --- domain/domain.go | 45 ++++++++++----------------- infoschema/builder.go | 37 ++++++++++++---------- table/table.go | 6 ++-- table/tables/cache.go | 63 +++++++++++++++----------------------- table/tables/cache_test.go | 52 ++++++++----------------------- 5 files changed, 75 insertions(+), 128 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index e83a10d2024e6..3bef8be807c63 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -17,7 +17,6 @@ package domain import ( "context" "fmt" - "github.com/pingcap/tidb/table" "math/rand" "strconv" "sync" @@ -50,6 +49,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -94,9 +94,8 @@ type Domain struct { serverID uint64 serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - cachedTableIDs *[]int64 - closeCh chan struct{} - onClose func() + renewCh chan interface{} + onClose func() } // loadInfoSchema loads infoschema at startTS. @@ -161,7 +160,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -273,7 +272,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.renewCh).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -409,7 +408,7 @@ func (do *Domain) Reload() error { // lease renew, so it must be executed despite it is cache or not do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes) - do.cachedTableIDs = is.CachedTableIDs() + //do.cachedTableIDs = is.CachedTableIDs() lease := do.DDL().GetLease() sub := time.Since(startTime) // Reload interval is lease / 2, if load schema time elapses more than this interval, @@ -672,7 +671,7 @@ func (do *Domain) Close() { do.info.RemoveServerInfo() do.info.RemoveMinStartTS() } - do.closeCh <- struct {}{} + //do.closeCh <- struct{}{} close(do.exit) if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) @@ -688,7 +687,6 @@ func (do *Domain) Close() { do.onClose() } - logutil.BgLogger().Info("domain closed", zap.Duration("take time", time.Since(startTime))) } @@ -707,7 +705,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio indexUsageSyncLease: idxUsageSyncLease, planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, - closeCh: make(chan struct{}), + renewCh: make(chan interface{}, 10), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) @@ -842,7 +840,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R } do.wg.Add(1) - go do.closeRenewCh() + go do.renewLease() return nil } @@ -1739,32 +1737,23 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { do.infoCache.Insert(is, 0) } -func (do *Domain) closeRenewCh() { +func (do *Domain) renewLease() { defer func() { do.wg.Done() - logutil.BgLogger().Info("renew ch close") + logutil.BgLogger().Info("renew lease exited.") }() for { select { - case <-do.closeCh: - logutil.BgLogger().Info("==delete mode ==") - for _, id := range *do.cachedTableIDs { - tbl, ok := do.InfoSchema().TableByID(id) - if ok { - tbl.(table.CachedTable).CloseRenewCh() - } - } + case <-do.exit: return + case renewCh := <-do.renewCh: + renewInfo := renewCh.(tables.RenewInfo) + renewInfo.CacheTable.RenewLease(renewInfo) + default: - logutil.BgLogger().Info("==default mode ==") - for _, id := range *do.cachedTableIDs { - tbl, ok := do.InfoSchema().TableByID(id) - if ok { - tbl.(table.CachedTable).RenewLease() - } - } } } + } func init() { diff --git a/infoschema/builder.go b/infoschema/builder.go index 7d08b7e193450..eb92f1338af5d 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -40,7 +40,8 @@ type Builder struct { is *infoSchema // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature - store kv.Storage + store kv.Storage + renewCh chan interface{} } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -438,23 +439,10 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i } } } - tbl, err := tables.TableFromMeta(allocs, tblInfo) + tbl, err := b.tableFromMeta(allocs, tblInfo) if err != nil { return nil, errors.Trace(err) } - if tbl.Meta().TableCacheStatusType != model.TableCacheStatusDisable { - b.is.renewMutex.Lock() - defer b.is.renewMutex.Unlock() - isExisted := false - for i := 0; i < len(b.is.renewChs); i++ { - if b.is.renewChs[i] == tableID { - isExisted = true - } - } - if !isExisted { - b.is.renewChs = append(b.is.renewChs, tblInfo.ID) - } - } tableNames := b.is.schemaMap[dbInfo.Name.L] tableNames.tables[tblInfo.Name.L] = tbl bucketIdx := tableBucketIdx(tableID) @@ -614,7 +602,8 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } for _, di := range dbInfos { - err := b.createSchemaTablesForDB(di, tables.TableFromMeta) + //err := b.createSchemaTablesForDB(di, tables.TableFromMeta) + err := b.createSchemaTablesForDB(di, b.tableFromMeta) if err != nil { return nil, errors.Trace(err) } @@ -634,6 +623,19 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } return b, nil } +func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) { + ret, err := tables.TableFromMeta(alloc, tblInfo) + if err != nil { + return nil, errors.Trace(err) + } + if t, ok := ret.(table.CachedTable); ok { + err = t.Init(b.renewCh) + if err != nil { + return nil, errors.Trace(err) + } + } + return ret, nil +} type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) @@ -671,7 +673,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage) *Builder { +func NewBuilder(store kv.Storage, renewCh chan interface{}) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -680,6 +682,7 @@ func NewBuilder(store kv.Storage) *Builder { ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, + renewCh: renewCh, } } diff --git a/table/table.go b/table/table.go index 2d0777917f0ed..98adc1210cda8 100644 --- a/table/table.go +++ b/table/table.go @@ -20,6 +20,7 @@ package table import ( "context" + "github.com/opentracing/opentracing-go" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" @@ -251,6 +252,7 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table + Init(renewCh chan interface{}) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer @@ -260,8 +262,4 @@ type CachedTable interface { // MockGetDataLease only for test renew Lease MockGetDataLease() (uint64, uint64) - - CloseRenewCh() - - RenewLease() } diff --git a/table/tables/cache.go b/table/tables/cache.go index 625cb2ffd2e2f..67321cbdd78fe 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -45,18 +45,18 @@ var ( _ table.CachedTable = &cachedTable{} ) -type renewInfo struct { +type RenewInfo struct { ts uint64 *cacheData - op RenewLeaseType + op RenewLeaseType + CacheTable *cachedTable } type cachedTable struct { TableCommon cacheData atomic.Value handle StateRemote - renewCh chan renewInfo - stopCh chan struct{} + renewCh chan interface{} } // cacheData pack the cache data and lease. @@ -96,7 +96,7 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { after := nowTime.Sub(startTime) // TODO make this configurable in the following PRs if after >= (2 * time.Second) { - c.renewCh <- renewInfo{ts: ts, cacheData: data, op: RenewReadLease} + c.renewCh <- RenewInfo{ts: ts, cacheData: data, op: RenewReadLease, CacheTable: c} } return data } @@ -119,13 +119,16 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, handle: &mockStateRemoteHandle{mockStateRemote.Ch}, - renewCh: make(chan renewInfo), - stopCh: make(chan struct{}), + renewCh: make(chan interface{}), } - //go ret.renewLease() return ret, nil } +func (c *cachedTable) Init(renewCh chan interface{}) error { + c.renewCh = renewCh + return nil +} + func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) { buffer, err := newMemBuffer(store) if err != nil { @@ -233,29 +236,19 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return c.TableCommon.RemoveRecord(ctx, h, r) } -func (c *cachedTable) RenewLease() { - for { - select { - case <-c.stopCh: - close(c.renewCh) - return - default: - for renewInfo := range c.renewCh { - tid := c.Meta().ID - lease := leaseFromTS(renewInfo.ts) - succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) - if err != nil { - log.Warn("Renew read lease error") - } - if succ { - c.cacheData.Store(&cacheData{ - Start: renewInfo.ts, - Lease: lease, - MemBuffer: renewInfo.cacheData, - }) - } - } - } +func (c *cachedTable) RenewLease(renewInfo RenewInfo) { + tid := c.Meta().ID + lease := leaseFromTS(renewInfo.ts) + succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) + if err != nil { + log.Warn("Renew read lease error") + } + if succ { + c.cacheData.Store(&cacheData{ + Start: renewInfo.ts, + Lease: lease, + MemBuffer: renewInfo.cacheData, + }) } } @@ -268,11 +261,3 @@ func (c *cachedTable) MockGetDataLease() (uint64, uint64) { data := tmp.(*cacheData) return data.Start, data.Lease } - -func (c *cachedTable) CloseRenewCh() { - _, ok := <-c.renewCh - if ok { - close(c.renewCh) - } - //c.stopCh <- struct{}{} -} diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e1920c6fd0b9c..e0982b3b6bd07 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,16 +15,14 @@ package tables_test import ( - "context" "testing" "time" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" ) func TestCacheTableBasicScan(t *testing.T) { @@ -403,47 +401,21 @@ func TestRenewLease(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") se := tk.Session() - tableInfo := &model.TableInfo{ - ID: 1, - Name: model.NewCIStr("t"), - Charset: "utf8", - Collate: "utf8_bin", - TableCacheStatusType: model.TableCacheStatusEnable, - PKIsHandle: true, - } - ctx := context.Background() - tbl := tables.MockTableFromMeta(tableInfo) - cacheTable := tbl.(table.CachedTable) - err := se.NewTxn(ctx) + tk.MustExec("create table cache_renew_t (id int)") + tk.MustExec("alter table cache_renew_t cache") + tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) require.NoError(t, err) - txn, err := se.Txn(true) - require.NoError(t, err) - data := cacheTable.TryReadFromCache(txn.StartTS()) - require.Equal(t, data, nil) - for { - err := se.NewTxn(ctx) - require.NoError(t, err) - txn, err := se.Txn(true) - err = cacheTable.UpdateLockForRead(se.GetStore(), txn.StartTS()) - if err == nil { - break - } - } - startTs, _ := cacheTable.MockGetDataLease() - physicalTime := oracle.GetTimeFromTS(startTs) - lease := oracle.GoTimeToTS(physicalTime.Add(2*time.Second + 500*time.Millisecond)) - data = cacheTable.TryReadFromCache(lease) - require.NotNil(t, data) - physicalTime = oracle.GetTimeFromTS(lease) - newLease := oracle.GoTimeToTS(physicalTime.Add(3 * time.Second)) + cacheTable := tbl.(table.CachedTable) var i int - for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) + tk.MustExec("select * from cache_renew_t") + _, oldLease := cacheTable.MockGetDataLease() + for i = 0; i < 20; i++ { + time.Sleep(200 * time.Millisecond) + tk.MustExec("select * from cache_renew_t") _, lease := cacheTable.MockGetDataLease() - if lease == newLease { + if lease != oldLease { break } } - require.True(t, i < 10) - cacheTable.CloseRenewCh() + require.True(t, i < 20) } From 3024cf49069a8dea27100cd92c4eccdb7d6acd9f Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 21:50:31 +0800 Subject: [PATCH 06/19] draft_1 --- infoschema/infoschema.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 3f2e00377adad..86dbc555996d8 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -59,7 +59,6 @@ type InfoSchema interface { RuleBundles() []*placement.Bundle // AllPlacementPolicies returns all placement policies AllPlacementPolicies() []*model.PolicyInfo - CachedTableIDs() *[]int64 } type sortedTables []table.Table @@ -109,14 +108,6 @@ type infoSchema struct { // schemaMetaVersion is the version of schema, and we should check version when change schema. schemaMetaVersion int64 - - // - renewMutex sync.RWMutex - renewChs []int64 -} - -func (is *infoSchema ) CachedTableIDs() *[]int64 { - return &is.renewChs } // MockInfoSchema only serves for test. From 73c40505ac96388f12a5dc80c7c3b14729117faf Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 21:56:15 +0800 Subject: [PATCH 07/19] draft_1 --- table/tables/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/cache.go b/table/tables/cache.go index 67321cbdd78fe..a57d7e5f1ca07 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -44,7 +44,7 @@ var ( _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) - +// RenewInfo is used for domain and cacheTable to convey renew information. type RenewInfo struct { ts uint64 *cacheData From 3a35566644bf90259c90239b50c7f622a0ae7e1a Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 22:13:53 +0800 Subject: [PATCH 08/19] draft_1 --- ddl/placement_policy_ddl_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index c713570124efd..b29812e368a57 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -112,7 +112,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store).InitWithDBInfos( + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, nil, []*model.PolicyInfo{p1, p2, p3, p4, p5}, From 1531867e47cd636e464fd937e16b9f657477cd8a Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 22:36:30 +0800 Subject: [PATCH 09/19] draft_1 --- executor/slow_query_test.go | 2 +- table/tables/cache.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index feb8ac60ba4c1..66ca71897602f 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/table/tables/cache.go b/table/tables/cache.go index a57d7e5f1ca07..fc2b325cd18c3 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -44,6 +44,7 @@ var ( _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) + // RenewInfo is used for domain and cacheTable to convey renew information. type RenewInfo struct { ts uint64 From cacd8973572f5db0dd9c47be7a98b4bbc12efa56 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 23:04:54 +0800 Subject: [PATCH 10/19] draft_1 --- infoschema/infoschema_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 1694c481d24dd..87c1695833f4d 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store()).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() From f4abff4926d9f8ad54a20f28bc5b3fb935341008 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 19 Nov 2021 23:31:38 +0800 Subject: [PATCH 11/19] draft_1 --- domain/domain.go | 2 -- table/tables/cache_test.go | 1 - 2 files changed, 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 3bef8be807c63..7a0d7c99911e7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1749,8 +1749,6 @@ func (do *Domain) renewLease() { case renewCh := <-do.renewCh: renewInfo := renewCh.(tables.RenewInfo) renewInfo.CacheTable.RenewLease(renewInfo) - - default: } } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e0982b3b6bd07..98d24686a5342 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -263,7 +263,6 @@ func TestCacheTableComplexRead(t *testing.T) { require.True(t, i < 10) tk2.MustExec("commit") doneCh <- struct{}{} - return }() <-doneCh tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") From c9bad74fa0c0ac015b0697238c01927e7660d7d6 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Sun, 21 Nov 2021 17:03:51 +0800 Subject: [PATCH 12/19] draft_1 --- domain/domain.go | 28 +++++--------- infoschema/builder.go | 13 ++++--- table/table.go | 6 +-- table/tables/cache.go | 72 +++++++++++++++--------------------- table/tables/cache_test.go | 7 ++-- table/tables/state_remote.go | 14 +++---- 6 files changed, 58 insertions(+), 82 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 7a0d7c99911e7..33f356b92cf8b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -85,7 +84,7 @@ type Domain struct { sysVarCache sysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle - wg sync.WaitGroup + wg util.WaitGroupWrapper statsUpdating sync2.AtomicInt32 cancel context.CancelFunc indexUsageSyncLease time.Duration @@ -94,7 +93,7 @@ type Domain struct { serverID uint64 serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - renewCh chan interface{} + renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() } @@ -160,7 +159,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -272,7 +271,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.renewCh).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -408,7 +407,6 @@ func (do *Domain) Reload() error { // lease renew, so it must be executed despite it is cache or not do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes) - //do.cachedTableIDs = is.CachedTableIDs() lease := do.DDL().GetLease() sub := time.Since(startTime) // Reload interval is lease / 2, if load schema time elapses more than this interval, @@ -671,7 +669,6 @@ func (do *Domain) Close() { do.info.RemoveServerInfo() do.info.RemoveMinStartTS() } - //do.closeCh <- struct{}{} close(do.exit) if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) @@ -680,7 +677,6 @@ func (do *Domain) Close() { do.slowQuery.Close() do.cancel() do.wg.Wait() - //close(do.closeCh) do.sysSessionPool.Close() variable.UnregisterStatistics(do.bindHandle) if do.onClose != nil { @@ -705,7 +701,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio indexUsageSyncLease: idxUsageSyncLease, planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, - renewCh: make(chan interface{}, 10), + renewLeaseCh: make(chan func(), 10), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) @@ -830,18 +826,16 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Add(3) + do.wg.Add(4) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() + go do.renewLease() go do.globalConfigSyncerKeeper() if !skipRegisterToDashboard { do.wg.Add(1) go do.topologySyncerKeeper() } - do.wg.Add(1) - go do.renewLease() - return nil } @@ -1740,18 +1734,16 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { func (do *Domain) renewLease() { defer func() { do.wg.Done() - logutil.BgLogger().Info("renew lease exited.") + logutil.BgLogger().Info("renew lease goroutine exited.") }() for { select { case <-do.exit: return - case renewCh := <-do.renewCh: - renewInfo := renewCh.(tables.RenewInfo) - renewInfo.CacheTable.RenewLease(renewInfo) + case op := <-do.renewLeaseCh: + op() } } - } func init() { diff --git a/infoschema/builder.go b/infoschema/builder.go index eb92f1338af5d..d0ab2d58ac273 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -40,8 +40,9 @@ type Builder struct { is *infoSchema // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature - store kv.Storage - renewCh chan interface{} + store kv.Storage + // TODO: renewLeaseCh is only used to pass data between table and domain + renewLeaseCh chan func() } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -602,7 +603,6 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } for _, di := range dbInfos { - //err := b.createSchemaTablesForDB(di, tables.TableFromMeta) err := b.createSchemaTablesForDB(di, b.tableFromMeta) if err != nil { return nil, errors.Trace(err) @@ -623,13 +623,14 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } return b, nil } + func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) { ret, err := tables.TableFromMeta(alloc, tblInfo) if err != nil { return nil, errors.Trace(err) } if t, ok := ret.(table.CachedTable); ok { - err = t.Init(b.renewCh) + err = t.Init(b.renewLeaseCh) if err != nil { return nil, errors.Trace(err) } @@ -673,7 +674,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, renewCh chan interface{}) *Builder { +func NewBuilder(store kv.Storage, renewCh chan func()) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -682,7 +683,7 @@ func NewBuilder(store kv.Storage, renewCh chan interface{}) *Builder { ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, - renewCh: renewCh, + renewLeaseCh: renewCh, } } diff --git a/table/table.go b/table/table.go index 98adc1210cda8..c96c77b587d19 100644 --- a/table/table.go +++ b/table/table.go @@ -252,14 +252,12 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - Init(renewCh chan interface{}) error + Init(renewCh chan func()) error + // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table UpdateLockForRead(store kv.Storage, ts uint64) error - - // MockGetDataLease only for test renew Lease - MockGetDataLease() (uint64, uint64) } diff --git a/table/tables/cache.go b/table/tables/cache.go index fc2b325cd18c3..575530b77e877 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -45,19 +45,11 @@ var ( _ table.CachedTable = &cachedTable{} ) -// RenewInfo is used for domain and cacheTable to convey renew information. -type RenewInfo struct { - ts uint64 - *cacheData - op RenewLeaseType - CacheTable *cachedTable -} - type cachedTable struct { TableCommon cacheData atomic.Value handle StateRemote - renewCh chan interface{} + renewCh chan func() } // cacheData pack the cache data and lease. @@ -92,40 +84,42 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { } data := tmp.(*cacheData) if ts >= data.Start && ts < data.Lease { - startTime := oracle.GetTimeFromTS(data.Start) + leaseTime := oracle.GetTimeFromTS(data.Lease) nowTime := oracle.GetTimeFromTS(ts) - after := nowTime.Sub(startTime) + distance := leaseTime.Sub(nowTime) // TODO make this configurable in the following PRs - if after >= (2 * time.Second) { - c.renewCh <- RenewInfo{ts: ts, cacheData: data, op: RenewReadLease, CacheTable: c} + if distance <= (1 * time.Second) { + c.renewCh <- c.renewLease(ts, RenewReadLease, data) } return data } return nil } -var mockStateRemote = struct { +var MockStateRemote = struct { Ch chan remoteTask Data *mockStateRemoteData }{} // NewCachedTable creates a new CachedTable Instance func NewCachedTable(tbl *TableCommon) (table.Table, error) { - if mockStateRemote.Data == nil { - mockStateRemote.Data = newMockStateRemoteData() - mockStateRemote.Ch = make(chan remoteTask, 100) - go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch) + if MockStateRemote.Data == nil { + MockStateRemote.Data = newMockStateRemoteData() + MockStateRemote.Ch = make(chan remoteTask, 100) + go mockRemoteService(MockStateRemote.Data, MockStateRemote.Ch) } ret := &cachedTable{ TableCommon: *tbl, - handle: &mockStateRemoteHandle{mockStateRemote.Ch}, - renewCh: make(chan interface{}), + handle: &mockStateRemoteHandle{MockStateRemote.Ch}, + renewCh: make(chan func()), } return ret, nil } -func (c *cachedTable) Init(renewCh chan interface{}) error { +// Init is an extra operation for cachedTable after TableFromMeta, +// Because cachedTable need some additional parameter that can't be passed in TableFromMeta. +func (c *cachedTable) Init(renewCh chan func()) error { c.renewCh = renewCh return nil } @@ -237,28 +231,20 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return c.TableCommon.RemoveRecord(ctx, h, r) } -func (c *cachedTable) RenewLease(renewInfo RenewInfo) { - tid := c.Meta().ID - lease := leaseFromTS(renewInfo.ts) - succ, err := c.handle.RenewLease(tid, lease, renewInfo.op) - if err != nil { - log.Warn("Renew read lease error") - } - if succ { - c.cacheData.Store(&cacheData{ - Start: renewInfo.ts, - Lease: lease, - MemBuffer: renewInfo.cacheData, - }) +func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { + return func() { + tid := c.Meta().ID + lease := leaseFromTS(ts) + succ, err := c.handle.RenewLease(tid, lease, op) + if err != nil { + log.Warn("Renew read lease error") + } + if succ { + c.cacheData.Store(&cacheData{ + Lease: lease, + MemBuffer: data, + }) + } } } -// MockGetDataLease is only for test -func (c *cachedTable) MockGetDataLease() (uint64, uint64) { - tmp := c.cacheData.Load() - if tmp == nil { - return 0, 0 - } - data := tmp.(*cacheData) - return data.Start, data.Lease -} diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 98d24686a5342..7de2bdffbbf82 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,12 +15,12 @@ package tables_test import ( + "github.com/pingcap/tidb/table/tables" "testing" "time" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -404,14 +404,13 @@ func TestRenewLease(t *testing.T) { tk.MustExec("alter table cache_renew_t cache") tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t")) require.NoError(t, err) - cacheTable := tbl.(table.CachedTable) var i int tk.MustExec("select * from cache_renew_t") - _, oldLease := cacheTable.MockGetDataLease() + _, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) for i = 0; i < 20; i++ { time.Sleep(200 * time.Millisecond) tk.MustExec("select * from cache_renew_t") - _, lease := cacheTable.MockGetDataLease() + _, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID) if lease != oldLease { break } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 344e31157a516..937010f7fea3d 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -308,9 +308,8 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, ts uint64) (bool, err record, ok := r.data[tid] if !ok { record = &stateRecord{ - lockLease: ts, - oldReadLease: ts, - lockType: CachedTableLockRead, + lockLease: ts, + lockType: CachedTableLockRead, } r.data[tid] = record return true, nil @@ -318,9 +317,10 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, ts uint64) (bool, err if record.lockType != CachedTableLockRead { return false, errors.New("The read lock can be renewed only in the read lock state") } - if record.lockLease > ts { - return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") + if record.lockLease <= ts { + record.lockLease = ts + return true, nil } - record.lockLease = ts - return true, nil + return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") + } From 29343793d3bd9360fb5b722a6059a36fa013bdc5 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Sun, 21 Nov 2021 17:28:16 +0800 Subject: [PATCH 13/19] draft_1 --- table/tables/cache.go | 1 - 1 file changed, 1 deletion(-) diff --git a/table/tables/cache.go b/table/tables/cache.go index 575530b77e877..b48ce10dae2f6 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -247,4 +247,3 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) } } } - From adb83cbb6f8eba8d34dba1dce3bbbbb4119f77ae Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Sun, 21 Nov 2021 17:37:40 +0800 Subject: [PATCH 14/19] draft_1 --- table/tables/cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/table/tables/cache.go b/table/tables/cache.go index b48ce10dae2f6..1ab4495b1ff1d 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -96,6 +96,8 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { return nil } +// MockStateRemote represents the information of stateRemote. +// Exported it only for testing. var MockStateRemote = struct { Ch chan remoteTask Data *mockStateRemoteData From 2009265c754c7436054d65c9a60137f7dc3a4ddf Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Sun, 21 Nov 2021 17:57:35 +0800 Subject: [PATCH 15/19] draft_1 --- table/tables/cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 7de2bdffbbf82..a435f10422e85 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,12 +15,12 @@ package tables_test import ( - "github.com/pingcap/tidb/table/tables" "testing" "time" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) From 9108e9e09e219d8608b151ff728af4bf9c4eea4d Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Sun, 21 Nov 2021 19:49:57 +0800 Subject: [PATCH 16/19] draft_1 --- domain/domain.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 33f356b92cf8b..0646144081c27 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -84,7 +84,7 @@ type Domain struct { sysVarCache sysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle - wg util.WaitGroupWrapper + wg sync.WaitGroup statsUpdating sync2.AtomicInt32 cancel context.CancelFunc indexUsageSyncLease time.Duration @@ -682,7 +682,6 @@ func (do *Domain) Close() { if do.onClose != nil { do.onClose() } - logutil.BgLogger().Info("domain closed", zap.Duration("take time", time.Since(startTime))) } From a6f8369ceee567302042ec1d2cbc0ac5d0fd0bb0 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Mon, 22 Nov 2021 13:46:56 +0800 Subject: [PATCH 17/19] solve_comment --- domain/domain.go | 1 + table/tables/cache.go | 7 ++++--- table/tables/state_remote.go | 26 +++++++++++++++----------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 0646144081c27..c5d8dc8246cd0 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1738,6 +1738,7 @@ func (do *Domain) renewLease() { for { select { case <-do.exit: + close(do.renewLeaseCh) return case op := <-do.renewLeaseCh: op() diff --git a/table/tables/cache.go b/table/tables/cache.go index 1ab4495b1ff1d..c7e4cea2e8814 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -88,7 +88,7 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { nowTime := oracle.GetTimeFromTS(ts) distance := leaseTime.Sub(nowTime) // TODO make this configurable in the following PRs - if distance <= (1 * time.Second) { + if distance >= 0 && distance <= (1*time.Second) { c.renewCh <- c.renewLease(ts, RenewReadLease, data) } return data @@ -97,7 +97,7 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { } // MockStateRemote represents the information of stateRemote. -// Exported it only for testing. +// Exported it only for testing. var MockStateRemote = struct { Ch chan remoteTask Data *mockStateRemoteData @@ -237,12 +237,13 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) return func() { tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(tid, lease, op) + succ, err := c.handle.RenewLease(tid, ts, lease, op) if err != nil { log.Warn("Renew read lease error") } if succ { c.cacheData.Store(&cacheData{ + Start: data.Start, Lease: lease, MemBuffer: data, }) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 937010f7fea3d..a6e71865df40d 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -49,7 +49,7 @@ type StateRemote interface { LockForWrite(tid int64, now, ts uint64) error // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(tid int64, ts uint64, op RenewLeaseType) (bool, error) + RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -101,10 +101,10 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { return op.err } -func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64, op RenewLeaseType) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) { switch op { case RenewReadLease: - op := &renewLeaseForReadOP{tid: tid, ts: ts} + op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs} op.Add(1) r.ch <- op op.Wait() @@ -183,7 +183,8 @@ type renewLeaseForReadOP struct { sync.WaitGroup // Input tid int64 - ts uint64 + oldTs uint64 + newTs uint64 // Output succ bool @@ -191,7 +192,7 @@ type renewLeaseForReadOP struct { } func (op *renewLeaseForReadOP) Exec(r *mockStateRemoteData) { - op.succ, op.err = r.renewLeaseForRead(op.tid, op.ts) + op.succ, op.err = r.renewLeaseForRead(op.tid, op.oldTs, op.newTs) op.Done() } @@ -304,23 +305,26 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e return 0, nil } -func (r *mockStateRemoteData) renewLeaseForRead(tid int64, ts uint64) (bool, error) { +func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs uint64) (bool, error) { record, ok := r.data[tid] if !ok { record = &stateRecord{ - lockLease: ts, + lockLease: newTs, lockType: CachedTableLockRead, } r.data[tid] = record return true, nil } if record.lockType != CachedTableLockRead { - return false, errors.New("The read lock can be renewed only in the read lock state") + return false, errors.New("The read lock can be renewed only in the read lock state.") } - if record.lockLease <= ts { - record.lockLease = ts + if record.lockLease < oldTs { + return false, errors.New("The remote Lease is invalid.") + } + if record.lockLease <= newTs { + record.lockLease = newTs return true, nil } - return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") + return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation.") } From fedf01cd40579c0f44f5745700a496b6addf4438 Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Mon, 22 Nov 2021 13:57:10 +0800 Subject: [PATCH 18/19] solve_comment --- table/tables/state_remote.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index a6e71865df40d..33da997db685f 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -182,9 +182,9 @@ func (op *lockForWriteOP) Exec(data *mockStateRemoteData) { type renewLeaseForReadOP struct { sync.WaitGroup // Input - tid int64 + tid int64 oldTs uint64 - newTs uint64 + newTs uint64 // Output succ bool From 2000cc1f2b92c50c7bf3c892ebaa9efc8a61996a Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Mon, 22 Nov 2021 14:02:22 +0800 Subject: [PATCH 19/19] solve_comment --- table/tables/state_remote.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 33da997db685f..46a304127ec5e 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -316,15 +316,15 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u return true, nil } if record.lockType != CachedTableLockRead { - return false, errors.New("The read lock can be renewed only in the read lock state.") + return false, errors.New("The read lock can be renewed only in the read lock state") } if record.lockLease < oldTs { - return false, errors.New("The remote Lease is invalid.") + return false, errors.New("The remote Lease is invalid") } if record.lockLease <= newTs { record.lockLease = newTs return true, nil } - return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation.") + return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation") }