Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support renew lease for read operation on cached table #29840

Merged
merged 21 commits into from
Nov 22, 2021
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
31 changes: 24 additions & 7 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false.

onClose func()
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
}

// loadInfoSchema loads infoschema at startTS.
Expand Down Expand Up @@ -159,7 +159,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand All @@ -287,6 +287,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
actions = append(actions, uint64(1<<diff.Type))
}
}

is := builder.Build()
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
Expand Down Expand Up @@ -406,7 +407,6 @@ func (do *Domain) Reload() error {

// lease renew, so it must be executed despite it is cache or not
do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes)

lease := do.DDL().GetLease()
sub := time.Since(startTime)
// Reload interval is lease / 2, if load schema time elapses more than this interval,
Expand Down Expand Up @@ -700,6 +700,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
}

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
Expand Down Expand Up @@ -824,11 +825,11 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Add(3)
do.wg.Add(4)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.renewLease()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now you have used the var wg util.WaitGroupWrapper and you can simplify the code like here.

var wg util.WaitGroupWrapper

Copy link
Member

@hawkingrei hawkingrei Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can only change the renewLease for using the Run of WaitGroupWrapper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other leave it as it is because WaitGroupWrapper is compatible with WaitGroup.

go do.globalConfigSyncerKeeper()

if !skipRegisterToDashboard {
do.wg.Add(1)
go do.topologySyncerKeeper()
Expand Down Expand Up @@ -1729,6 +1730,22 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache.Insert(is, 0)
}

func (do *Domain) renewLease() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a global gorountine

defer func() {
do.wg.Done()
logutil.BgLogger().Info("renew lease goroutine exited.")
}()
for {
select {
case <-do.exit:
close(do.renewLeaseCh)
return
case op := <-do.renewLeaseCh:
JayLZhou marked this conversation as resolved.
Show resolved Hide resolved
op()
}
}
}

func init() {
initByLDFlagsForGlobalKill()
}
Expand Down
2 changes: 1 addition & 1 deletion executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu
}

func newSlowQueryRetriever() (*slowQueryRetriever, error) {
newISBuilder, err := infoschema.NewBuilder(nil).InitWithDBInfos(nil, nil, nil, 0)
newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0)
if err != nil {
return nil, err
}
Expand Down
23 changes: 20 additions & 3 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Builder struct {
// TODO: store is only used by autoid allocators
// detach allocators from storage, use passed transaction in the feature
store kv.Storage
// TODO: renewLeaseCh is only used to pass data between table and domain
renewLeaseCh chan func()
}

// ApplyDiff applies SchemaDiff to the new InfoSchema.
Expand Down Expand Up @@ -438,7 +440,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
}
}
}
tbl, err := tables.TableFromMeta(allocs, tblInfo)
tbl, err := b.tableFromMeta(allocs, tblInfo)
JayLZhou marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -601,7 +603,7 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.
}

for _, di := range dbInfos {
err := b.createSchemaTablesForDB(di, tables.TableFromMeta)
err := b.createSchemaTablesForDB(di, b.tableFromMeta)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -622,6 +624,20 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.
return b, nil
}

func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline before this function;
Comment like: // tableFromMeta wraps the tables.TableFromMeta function and add extra operation for cached table initialization

ret, err := tables.TableFromMeta(alloc, tblInfo)
if err != nil {
return nil, errors.Trace(err)
}
if t, ok := ret.(table.CachedTable); ok {
err = t.Init(b.renewLeaseCh)
if err != nil {
return nil, errors.Trace(err)
}
}
return ret, nil
}

type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error)

func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc) error {
Expand Down Expand Up @@ -658,7 +674,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc)
}

// NewBuilder creates a new Builder with a Handle.
func NewBuilder(store kv.Storage) *Builder {
func NewBuilder(store kv.Storage, renewCh chan func()) *Builder {
return &Builder{
store: store,
is: &infoSchema{
Expand All @@ -667,6 +683,7 @@ func NewBuilder(store kv.Storage) *Builder {
ruleBundleMap: map[string]*placement.Bundle{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
renewLeaseCh: renewCh,
}
}

Expand Down
6 changes: 3 additions & 3 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestBasic(t *testing.T) {
})
require.NoError(t, err)

builder, err := infoschema.NewBuilder(dom.Store()).InitWithDBInfos(dbInfos, nil, nil, 1)
builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1)
require.NoError(t, err)

txn, err := store.Begin()
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestInfoTables(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestGetBundle(t *testing.T) {
require.NoError(t, err)
}()

builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, nil, 0)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
is := builder.Build()

Expand Down
2 changes: 2 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table
type CachedTable interface {
Table

Init(renewCh chan func()) error

// TryReadFromCache checks if the cache table is readable.
TryReadFromCache(ts uint64) kv.MemBuffer

Expand Down
61 changes: 54 additions & 7 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
Expand All @@ -29,6 +30,16 @@ import (
"github.com/tikv/client-go/v2/tikv"
)

// RenewLeaseType define the type for renew lease.
type RenewLeaseType int

const (
// RenewReadLease means renew read lease.
RenewReadLease RenewLeaseType = iota + 1
// RenewWriteLease means renew write lease.
RenewWriteLease
)

var (
_ table.Table = &cachedTable{}
_ table.CachedTable = &cachedTable{}
Expand All @@ -38,6 +49,7 @@ type cachedTable struct {
TableCommon
cacheData atomic.Value
handle StateRemote
renewCh chan func()
}

// cacheData pack the cache data and lease.
Expand Down Expand Up @@ -72,31 +84,48 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer {
}
data := tmp.(*cacheData)
if ts >= data.Start && ts < data.Lease {
leaseTime := oracle.GetTimeFromTS(data.Lease)
nowTime := oracle.GetTimeFromTS(ts)
distance := leaseTime.Sub(nowTime)
// TODO make this configurable in the following PRs
if distance >= 0 && distance <= (1*time.Second) {
c.renewCh <- c.renewLease(ts, RenewReadLease, data)
}
return data
}
return nil
}

var mockStateRemote = struct {
// MockStateRemote represents the information of stateRemote.
// Exported it only for testing.
var MockStateRemote = struct {
Ch chan remoteTask
Data *mockStateRemoteData
}{}

// NewCachedTable creates a new CachedTable Instance
func NewCachedTable(tbl *TableCommon) (table.Table, error) {
if mockStateRemote.Data == nil {
mockStateRemote.Data = newMockStateRemoteData()
mockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch)
if MockStateRemote.Data == nil {
MockStateRemote.Data = newMockStateRemoteData()
MockStateRemote.Ch = make(chan remoteTask, 100)
go mockRemoteService(MockStateRemote.Data, MockStateRemote.Ch)
}

ret := &cachedTable{
TableCommon: *tbl,
handle: &mockStateRemoteHandle{mockStateRemote.Ch},
handle: &mockStateRemoteHandle{MockStateRemote.Ch},
renewCh: make(chan func()),
}

return ret, nil
}

// Init is an extra operation for cachedTable after TableFromMeta,
// Because cachedTable need some additional parameter that can't be passed in TableFromMeta.
func (c *cachedTable) Init(renewCh chan func()) error {
c.renewCh = renewCh
return nil
}

func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) {
buffer, err := newMemBuffer(store)
if err != nil {
Expand Down Expand Up @@ -203,3 +232,21 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
}
return c.TableCommon.RemoveRecord(ctx, h, r)
}

func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() {
return func() {
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.RenewLease(tid, ts, lease, op)
if err != nil {
log.Warn("Renew read lease error")
}
if succ {
c.cacheData.Store(&cacheData{
JayLZhou marked this conversation as resolved.
Show resolved Hide resolved
Start: data.Start,
Lease: lease,
MemBuffer: data,
})
}
}
}
30 changes: 29 additions & 1 deletion table/tables/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -358,7 +361,7 @@ func TestCacheTableBatchPointGet(t *testing.T) {
tk.MustExec("insert into bp_cache_tmp1 values(2, 12, 102)")
tk.MustExec("insert into bp_cache_tmp1 values(3, 13, 103)")
tk.MustExec("insert into bp_cache_tmp1 values(4, 14, 104)")

tk.MustExec("alter table bp_cache_tmp1 cache")
// check point get out transaction
tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 3)").Check(testkit.Rows("1 11 101", "3 13 103"))
tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 13)").Check(testkit.Rows("1 11 101", "3 13 103"))
Expand Down Expand Up @@ -389,3 +392,28 @@ func TestCacheTableBatchPointGet(t *testing.T) {
tk.MustQuery("select * from bp_cache_tmp1 where id in (1, 4)").Check(testkit.Rows("1 11 101"))
tk.MustQuery("select * from bp_cache_tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101"))
}

func TestRenewLease(t *testing.T) {
// Test RenewLeaseForRead
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
se := tk.Session()
tk.MustExec("create table cache_renew_t (id int)")
tk.MustExec("alter table cache_renew_t cache")
tbl, err := se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("cache_renew_t"))
require.NoError(t, err)
var i int
tk.MustExec("select * from cache_renew_t")
_, oldLease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID)
for i = 0; i < 20; i++ {
time.Sleep(200 * time.Millisecond)
tk.MustExec("select * from cache_renew_t")
_, lease, _ := tables.MockStateRemote.Data.Load(tbl.Meta().ID)
if lease != oldLease {
break
}
}
require.True(t, i < 20)
}
Loading