From 9c1f01f555c79bb0737a9bcae2251a8b42ff02a5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 26 Oct 2021 21:26:08 +0800 Subject: [PATCH 01/13] table/tables: add `StateRemote` interface for the cached table --- table/tables/state_remote.go | 243 +++++++++++++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 table/tables/state_remote.go diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go new file mode 100644 index 0000000000000..7f18d8d692421 --- /dev/null +++ b/table/tables/state_remote.go @@ -0,0 +1,243 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" +) + +// CachedTableLockType define the lock type for cached table +type CachedTableLockType int + +const ( + // CachedTableLockNone means there is no lock. + CachedTableLockNone CachedTableLockType = iota + // CachedTableLockRead is the READ lock type. + CachedTableLockRead + // CachedTableLockIntend is the write INTEND, it exists when the changing READ to WRITE, and the READ lock lease is not expired.. + CachedTableLockIntend + // CachedTableLockWrite is the WRITE lock type. + CachedTableLockWrite +) + +func (l CachedTableLockType) String() string { + switch l { + case CachedTableLockNone: + return "NONE" + case CachedTableLockRead: + return "READ" + case CachedTableLockIntend: + return "INTEND" + case CachedTableLockWrite: + return "WRITE" + } + panic("invalid CachedTableLockType value") +} + +// StateRemote is the interface to control the remote state of the cached table lock meta information. +type StateRemote interface { + Load(tid int) (CachedTableLockType, uint64, error) + LockForRead(tid int, now, ts uint64) (bool, error) + LockForWrite(tid int, now, ts uint64) error + RenewLease(tid int, ts uint64) (bool, error) +} + +type sqlExec interface { + AffectedRows() uint64 + ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) +} + +type stateRemoteHandle struct { + exec sqlExec +} + +// NewStateRemote creates a StateRemote object. +func NewStateRemote(exec sqlExec) *stateRemoteHandle { + return &stateRemoteHandle{ + exec: exec, + } +} + +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func CreateMetaLockForCachedTable(h sqlExec) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. +func InitRow(ctx context.Context, exec sqlExec, tid int) error { + _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0)", tid) + return err +} + +func (h *stateRemoteHandle) Load(ctx context.Context, tid int) (CachedTableLockType, uint64, error) { + return h.loadRow(ctx, tid) +} + +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int, now, ts uint64) (bool, error) { + if err := h.beginTxn(ctx); err != nil { + return false, errors.Trace(err) + } + + lockType, lease, err := h.loadRow(ctx, tid) + if err != nil { + return false, errors.Trace(err) + } + + switch lockType { + case CachedTableLockNone: + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return false, errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return false, errors.Trace(err) + } + case CachedTableLockRead: + // Update lease + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return false, errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return false, errors.Trace(err) + } + case CachedTableLockWrite, CachedTableLockIntend: + if now > lease { + // Clear orphan lock + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return false, errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return false, errors.Trace(err) + } + } else { + // Fail to lock for read, others hold the write lock. + return false, nil + } + } + return true, nil +} + +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int, now, ts uint64) error { + if err := h.beginTxn(ctx); err != nil { + return errors.Trace(err) + } + + lockType, lease, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + + switch lockType { + case CachedTableLockNone: + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return errors.Trace(err) + } + case CachedTableLockRead: + // Change to READ to write INTEND + if err := h.updateRow(ctx, tid, "INTEND", lease); err != nil { + return errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return errors.Trace(err) + } + + // Wait for lease to expire. + + // And then change the lock to WRITE + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + // h.execSQL(ctx, "commit") + case CachedTableLockIntend, CachedTableLockWrite: + if now > lease { + // Clear orphan lock + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + if err := h.commitTxn(ctx); err != nil { + return errors.Trace(err) + } + } else { + return fmt.Errorf("fail to lock for write, curr state = %v", lockType) + } + } + return err +} + +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int, ts uint64) (bool, error) { + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid) + if err != nil { + return false, errors.Trace(err) + } + succ := h.exec.AffectedRows() > 0 + return succ, err +} + +func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "begin") + return err +} + +func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "commit") + return err +} + +func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int) (CachedTableLockType, uint64, error) { + chunkRows, err := h.execSQL(ctx, "select lock_type, lease from mysql.table_cache_meta where tid = %? for update", tid) + if err != nil { + return 0, 0, errors.Trace(err) + } + if len(chunkRows) != 1 { + return 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) + } + col1 := chunkRows[0].GetEnum(0) + // Note, the MySQL enum value start from 1 rather than 0 + lockType := CachedTableLockType(col1.Value - 1) + lease := chunkRows[0].GetUint64(1) + return lockType, lease, nil +} + +func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int, lockType string, lease uint64) error { + _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid) + return err +} + +func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { + rs, err := h.exec.ExecuteInternal(ctx, sql, args...) + if rs != nil { + defer rs.Close() + } + if err != nil { + return nil, err + } + if rs != nil { + return sqlexec.DrainRecordSet(ctx, rs, 1) + } + return nil, nil +} From c4cd46021c88fe3130f1aa86946439b5ebb1c63c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 27 Oct 2021 09:58:34 +0800 Subject: [PATCH 02/13] add the test file --- table/tables/state_remote_test.go | 109 ++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 table/tables/state_remote_test.go diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go new file mode 100644 index 0000000000000..4f54593387437 --- /dev/null +++ b/table/tables/state_remote_test.go @@ -0,0 +1,109 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +func TestStateRemote(t *testing.T) { + t.Parallel() + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + ctx := context.Background() + h := tables.NewStateRemote(se) + err := tables.CreateMetaLockForCachedTable(se) + require.NoError(t, err) + require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) + + // Check the initial value. + require.NoError(t, tables.InitRow(ctx, se, 5)) + lockType, lease, err := h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockNone) + require.Equal(t, lockType.String(), "NONE") + require.Equal(t, lease, uint64(0)) + + // Check read lock. + succ, err := h.LockForRead(ctx, 5, 1234, 1234) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1234)) + + // LockForRead when read lock is hold. + // This operation equals to renew lease. + succ, err = h.LockForRead(ctx, 5, 1235, 1235) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1235)) + + // Renew read lock lease operation. + succ, err = h.RenewLease(ctx, 5, 1264) + require.NoError(t, err) + require.True(t, succ) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockRead) + require.Equal(t, lockType.String(), "READ") + require.Equal(t, lease, uint64(1264)) + + // Check write lock. + require.NoError(t, h.LockForWrite(ctx, 5, 2234, 2234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(2234)) + + // Lock for write again + require.NoError(t, h.LockForWrite(ctx, 5, 3234, 3234)) + lockType, lease, err = h.Load(ctx, 5) + require.NoError(t, err) + require.Equal(t, lockType, tables.CachedTableLockWrite) + require.Equal(t, lockType.String(), "WRITE") + require.Equal(t, lease, uint64(3234)) + + // Renew read lock lease should fail when the write lock is hold. + succ, err = h.RenewLease(ctx, 5, 1264) + require.NoError(t, err) + require.False(t, succ) + + // Acquire read lock should also fail when the write lock is hold. + succ, err = h.LockForRead(ctx, 5, 1264, 1264) + require.NoError(t, err) + require.False(t, succ) + + // But clear orphan write lock should success. + succ, err = h.LockForRead(ctx, 5, 4234, 4234) + require.NoError(t, err) + require.True(t, succ) +} From 35e63b3aca821314b302d72288883cbd9fb0bcf6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 17 Nov 2021 10:47:36 +0800 Subject: [PATCH 03/13] adapt the meta lock code --- ddl/ddl_api.go | 7 +- domain/domain.go | 18 ++- executor/builder.go | 2 +- infoschema/builder.go | 35 ++++- meta/autoid/autoid.go | 4 + planner/core/logical_plan_builder.go | 2 +- session/bootstrap.go | 20 ++- session/session.go | 5 + session/tidb.go | 3 + table/table.go | 4 +- table/tables/cache.go | 34 +++-- table/tables/state_remote.go | 183 ++++++++++++++++++--------- table/tables/tables.go | 2 +- 13 files changed, 226 insertions(+), 93 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 0129d7d1d8d90..ab680e93bea1c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -6611,7 +6612,11 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + return err } func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error) { diff --git a/domain/domain.go b/domain/domain.go index 8ed2c18e58cd4..55f085dc2dfa0 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -95,6 +95,7 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. onClose func() + sysFactory func(*Domain) (pools.Resource, error) } // loadInfoSchema loads infoschema at startTS. @@ -159,7 +160,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -173,6 +174,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return is, false, currentSchemaVersion, nil, nil } +func (do *Domain) sysFacHack() (pools.Resource, error) { + // TODO: Here we create new sessions with sysFac in DDL, + // which will use `do` as Domain instead of call `domap.Get`. + // That's because `domap.Get` requires a lock, but before + // we initialize Domain finish, we can't require that again. + // After we remove the lazy logic of creating Domain, we + // can simplify code here. + return do.sysFactory(do) +} + func (do *Domain) fetchPolicies(m *meta.Meta) ([]*model.PolicyInfo, error) { allPolicies, err := m.ListPolicies() if err != nil { @@ -271,7 +282,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -711,6 +722,7 @@ const serverIDForStandalone = 1 // serverID for standalone deployment. // Init initializes a domain. func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error { + do.sysFactory = sysFactory perfschema.Init() if ebd, ok := do.store.(kv.EtcdBackend); ok { var addrs []string @@ -789,7 +801,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R return err } // step 2: domain reload the infoSchema. + fmt.Println("before domain.Reload() ...") err = do.Reload() + fmt.Println("after domain.Reload() ...") if err != nil { return err } diff --git a/executor/builder.go b/executor/builder.go index d42d584bbb982..27289ef9221fc 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4685,7 +4685,7 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 } }() if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { - err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS) + err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/infoschema/builder.go b/infoschema/builder.go index d3c532a2bfffb..26102fd7b8ab7 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" + "github.com/pingcap/tidb/util/sqlexec" ) // Builder builds a new InfoSchema. @@ -40,7 +42,8 @@ type Builder struct { is *infoSchema // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature - store kv.Storage + store kv.Storage + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -438,7 +441,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i } } } - tbl, err := tables.TableFromMeta(allocs, tblInfo) + tbl, err := b.tableFromMeta(allocs, tblInfo) if err != nil { return nil, errors.Trace(err) } @@ -601,7 +604,7 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } for _, di := range dbInfos { - err := b.createSchemaTablesForDB(di, tables.TableFromMeta) + err := b.createSchemaTablesForDB(di, b.tableFromMeta) if err != nil { return nil, errors.Trace(err) } @@ -622,6 +625,27 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. return b, nil } +func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) { + ret, err := tables.TableFromMeta(alloc, tblInfo) + if err != nil { + return nil, errors.Trace(err) + } + if t, ok := ret.(table.CachedTable); ok { + var tmp pools.Resource + tmp, err = b.factory() + if err != nil { + return nil, errors.Trace(err) + } + fmt.Println("wrap table from meta to handle .... cached table!!!") + err = t.Init(tmp.(sqlexec.SQLExecutor)) + fmt.Println("wrap table from meta to handle .... finish!!!") + if err != nil { + return nil, errors.Trace(err) + } + } + return ret, nil +} + type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc) error { @@ -658,9 +682,10 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage) *Builder { +func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { return &Builder{ - store: store, + factory: factory, + store: store, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, policyMap: map[string]*model.PolicyInfo{}, diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 8ee99f5939a0e..96d13d3bee48d 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -467,6 +467,10 @@ func (alloc *allocator) GetType() AllocatorType { return alloc.allocType } +func (alloc *allocator) GetStore() kv.Storage { + return alloc.store +} + // NextStep return new auto id step according to previous step and consuming time. func NextStep(curStep int64, consumeDur time.Duration) int64 { failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 7e8bd7ba6688c..99777f38e9234 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4164,7 +4164,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as } }() if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { - err := cachedTable.UpdateLockForRead(b.ctx.GetStore(), txn.StartTS()) + err := cachedTable.UpdateLockForRead(ctx, b.ctx.GetStore(), txn.StartTS()) if err != nil { log.Warn("Update Lock Info Error") } diff --git a/session/bootstrap.go b/session/bootstrap.go index 573c17bdaaf2e..228be29984c60 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -357,6 +357,14 @@ const ( last_analyzed_at TIMESTAMP, PRIMARY KEY (table_id, column_id) CLUSTERED );` + + CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta ( + tid int(11) NOT NULL DEFAULT 0, + lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE', + lease bigint(20) NOT NULL DEFAULT 0, + oldReadLease bigint(20) NOT NULL DEFAULT 0, + PRIMARY KEY (tid) + );` ) // bootstrap initiates system DB for a store. @@ -528,11 +536,13 @@ const ( version77 = 77 // version78 updates mysql.stats_buckets.lower_bound, mysql.stats_buckets.upper_bound and mysql.stats_histograms.last_analyze_pos from BLOB to LONGBLOB. version78 = 78 + // version79 adds the mysql.table_cache_meta table + version79 = 79 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version78 +var currentBootstrapVersion int64 = version79 var ( bootstrapVersion = []func(Session, int64){ @@ -614,6 +624,7 @@ var ( upgradeToVer76, upgradeToVer77, upgradeToVer78, + upgradeToVer79, } ) @@ -1612,6 +1623,13 @@ func upgradeToVer78(s Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms MODIFY last_analyze_pos LONGBLOB DEFAULT NULL") } +func upgradeToVer79(s Session, ver int64) { + if ver >= version79 { + return + } + doReentrantDDL(s, CreateTableCacheMetaTable) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, diff --git a/session/session.go b/session/session.go index 83c936ccf30b2..51e090f577e37 100644 --- a/session/session.go +++ b/session/session.go @@ -2444,6 +2444,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { runInBootstrapSession(store, upgrade) } + _, err := domap.Get(store) + if err != nil { + return nil, err + } + se, err := createSession(store) if err != nil { return nil, err diff --git a/session/tidb.go b/session/tidb.go index 911e64f3727f2..a2c4a40c12458 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -19,6 +19,7 @@ package session import ( + "fmt" "context" "sync" "sync/atomic" @@ -80,7 +81,9 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { dm.Delete(store) } d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) + fmt.Println("finish new domain>>>>>>>>") err1 = d.Init(ddlLease, sysFactory) + fmt.Println("init domain>>>>>>>>") if err1 != nil { // If we don't clean it, there are some dirty data when retrying the function of Init. d.Close() diff --git a/table/table.go b/table/table.go index f39e9b2d9fa8d..1e2d4f4614538 100644 --- a/table/table.go +++ b/table/table.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/sqlexec" ) // Type is used to distinguish between different tables that store data in different ways. @@ -252,10 +253,11 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table + Init(exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table - UpdateLockForRead(store kv.Storage, ts uint64) error + UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error } diff --git a/table/tables/cache.go b/table/tables/cache.go index ba8786a65bae3..ebe96ee0bd917 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -25,12 +25,12 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" ) var ( - _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) @@ -77,26 +77,24 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { return nil } -var mockStateRemote = struct { - Ch chan remoteTask - Data *mockStateRemoteData -}{} - // NewCachedTable creates a new CachedTable Instance -func NewCachedTable(tbl *TableCommon) (table.Table, error) { - if mockStateRemote.Data == nil { - mockStateRemote.Data = newMockStateRemoteData() - mockStateRemote.Ch = make(chan remoteTask, 100) - go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch) - } +func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ TableCommon: *tbl, - handle: &mockStateRemoteHandle{mockStateRemote.Ch}, } return ret, nil } +func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error { + raw, ok := exec.(sqlExec) + if !ok { + return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") + } + c.handle = NewStateRemote(raw) + return nil +} + func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) { buffer, err := newMemBuffer(store) if err != nil { @@ -138,11 +136,11 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return buffer, startTS, nil } -func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { +func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error { // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(tid, ts, lease) + succ, err := c.handle.LockForRead(ctx, tid, ts, lease) if err != nil { return errors.Trace(err) } @@ -169,7 +167,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. return nil, err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -183,7 +181,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -197,7 +195,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } now := txn.StartTS() - err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 36a1a412ae97c..86f764890d8d0 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" ) @@ -57,16 +58,16 @@ func (l CachedTableLockType) String() string { // StateRemote is the interface to control the remote state of the cached table's lock meta information. type StateRemote interface { // Load obtain the corresponding lock type and lease value according to the tableID - Load(tid int64) (CachedTableLockType, uint64, error) + Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) // LockForRead try to add a read lock to the table with the specified tableID - LockForRead(tid int64, now, ts uint64) (bool, error) + LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(tid int64, now, ts uint64) error + LockForWrite(ctx context.Context, tid int64, now, ts uint64) error // RenewLease attempt to renew the read lock on the table with the specified tableID - RenewLease(tid int64, ts uint64) (bool, error) + RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) } // mockStateRemoteHandle implement the StateRemote interface. @@ -74,7 +75,9 @@ type mockStateRemoteHandle struct { ch chan remoteTask } -func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) { +var _ StateRemote = &mockStateRemoteHandle{} + +func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { op := &loadOP{tid: tid} op.Add(1) r.ch <- op @@ -82,7 +85,7 @@ func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, er return op.lockType, op.lease, op.err } -func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) { +func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) { op := &lockForReadOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -90,7 +93,7 @@ func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, er return op.succ, op.err } -func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { +func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { op := &lockForWriteOP{tid: tid, now: now, ts: ts} op.Add(1) r.ch <- op @@ -118,7 +121,7 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error { return op.err } -func (r *mockStateRemoteHandle) RenewLease(tid int64, ts uint64) (bool, error) { +func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) { return false, errors.New("not implemented yet") } @@ -195,11 +198,11 @@ type stateRecord struct { lockType CachedTableLockType } -func newMockStateRemoteData() *mockStateRemoteData { - return &mockStateRemoteData{ - data: make(map[int64]*stateRecord), - } -} +// func newMockStateRemoteData() *mockStateRemoteData { +// return &mockStateRemoteData{ +// data: make(map[int64]*stateRecord), +// } +// } func (r *mockStateRemoteData) Load(tid int64) (CachedTableLockType, uint64, error) { record, ok := r.data[tid] @@ -297,6 +300,7 @@ func (r *mockStateRemoteData) LockForWrite(tid int64, now, ts uint64) (uint64, e type sqlExec interface { AffectedRows() uint64 ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error) + GetStore() kv.Storage } type stateRemoteHandle struct { @@ -310,6 +314,8 @@ func NewStateRemote(exec sqlExec) *stateRemoteHandle { } } +var _ StateRemote = &stateRemoteHandle{} + // CreateMetaLockForCachedTable initializes the cached table meta lock information. // func CreateMetaLockForCachedTable(h sqlExec) error { // createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + @@ -321,22 +327,23 @@ func NewStateRemote(exec sqlExec) *stateRemoteHandle { // return err // } -// InitRow add a new record into the cached table meta lock table. -func InitRow(ctx context.Context, exec sqlExec, tid int) error { - _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0)", tid) - return err -} +// // InitRow add a new record into the cached table meta lock table. +// func InitRow(ctx context.Context, exec sqlExec, tid int) error { +// _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0)", tid) +// return err +// } -func (h *stateRemoteHandle) Load(ctx context.Context, tid int) (CachedTableLockType, uint64, error) { - return h.loadRow(ctx, tid) +func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { + lockType, lease, _, err := h.loadRow(ctx, tid) + return lockType, lease, err } -func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int, now, ts uint64) (bool, error) { +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) { if err := h.beginTxn(ctx); err != nil { return false, errors.Trace(err) } - lockType, lease, err := h.loadRow(ctx, tid) + lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return false, errors.Trace(err) } @@ -374,57 +381,100 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int, now, ts ui return true, nil } -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int, now, ts uint64) error { - if err := h.beginTxn(ctx); err != nil { - return errors.Trace(err) +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { + for { + retry, err := h.lockForWriteOnce(ctx, tid, now, ts) + if err != nil { + return err + } + if !retry { + break + } + + store := h.exec.GetStore() + o := store.GetOracle() + newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) + if err != nil { + return errors.Trace(err) + } + now, ts = newTS, leaseFromTS(newTS) } + return nil +} - lockType, lease, err := h.loadRow(ctx, tid) +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (/*retry*/ bool, error) { + err := h.beginTxn(ctx); if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } + defer func() { + if err != nil { + h.rollbackTxn(ctx) + } + }() - switch lockType { - case CachedTableLockNone: + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) + if err != nil { + return false, errors.Trace(err) + } + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. + if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } - if err := h.commitTxn(ctx); err != nil { - return errors.Trace(err) + return false, h.commitTxn(ctx) + } + + // The lease is valid. + switch lockType { + case CachedTableLockNone: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return false, errors.Trace(err) } + return false, h.commitTxn(ctx) case CachedTableLockRead: - // Change to READ to write INTEND - if err := h.updateRow(ctx, tid, "INTEND", lease); err != nil { - return errors.Trace(err) + // Change from READ to INTEND + if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { + return false, errors.Trace(err) } - if err := h.commitTxn(ctx); err != nil { - return errors.Trace(err) + if err = h.commitTxn(ctx); err != nil { + return false, errors.Trace(err) } - // Wait for lease to expire. - - // And then change the lock to WRITE - if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return errors.Trace(err) - } - // h.execSQL(ctx, "commit") + // Wait for lease to expire, and then retry. + waitForLeaseExpire(oldReadLease, now) + return true, nil case CachedTableLockIntend, CachedTableLockWrite: - if now > lease { - // Clear orphan lock - if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return errors.Trace(err) - } - if err := h.commitTxn(ctx); err != nil { - return errors.Trace(err) + // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. + if now > oldReadLease { + if lockType == CachedTableLockIntend { + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return false, errors.Trace(err) + } } - } else { - return fmt.Errorf("fail to lock for write, curr state = %v", lockType) + return false, h.commitTxn(ctx) } + + // Otherwise, the WRITE should wait for the READ lease expire. + h.rollbackTxn(ctx) + waitForLeaseExpire(oldReadLease, now) + // And then retry change the lock to WRITE + return true, nil + } + return false, errors.New("should never run here") +} + +func waitForLeaseExpire(oldReadLease, now uint64) { + if oldReadLease >= now { + t1 := oracle.GetTimeFromTS(oldReadLease) + t2 := oracle.GetTimeFromTS(now) + waitDuration := t1.Sub(t2) + fmt.Println("sleep ... for old read lease to expire", waitDuration) + time.Sleep(waitDuration) } - return err } -func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int, ts uint64) (bool, error) { +func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) { _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid) if err != nil { return false, errors.Trace(err) @@ -435,30 +485,39 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int, ts uint64) func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "begin") + fmt.Println("begin ... executed ... ", h.exec) return err } func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "commit") + fmt.Println("commit ... executed ... ", h.exec) + return err +} + +func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { + _, err := h.execSQL(ctx, "rollback") + fmt.Println("rollback ... executed ... ", h.exec) return err } -func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int) (CachedTableLockType, uint64, error) { - chunkRows, err := h.execSQL(ctx, "select lock_type, lease from mysql.table_cache_meta where tid = %? for update", tid) +func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { + chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) if err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } if len(chunkRows) != 1 { - return 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) + return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid) } col1 := chunkRows[0].GetEnum(0) // Note, the MySQL enum value start from 1 rather than 0 lockType := CachedTableLockType(col1.Value - 1) lease := chunkRows[0].GetUint64(1) - return lockType, lease, nil + oldReadLease := chunkRows[0].GetUint64(2) + return lockType, lease, oldReadLease, nil } -func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int, lockType string, lease uint64) error { +func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error { _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid) return err } @@ -469,7 +528,7 @@ func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...int defer rs.Close() } if err != nil { - return nil, err + return nil, errors.Trace(err) } if rs != nil { return sqlexec.DrainRecordSet(ctx, rs, 1) diff --git a/table/tables/tables.go b/table/tables/tables.go index ecb7e23137745..9d263f99c0516 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -146,7 +146,7 @@ func TableFromMeta(allocs autoid.Allocators, tblInfo *model.TableInfo) (table.Ta return nil, err } if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - return NewCachedTable(&t) + return newCachedTable(&t) } return &t, nil } From 3b5586720956b9ce026ebf9093c4e6c27c41cb3e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 17 Nov 2021 17:30:05 +0800 Subject: [PATCH 04/13] clean up some debug info --- ddl/ddl_api.go | 4 +-- domain/domain.go | 4 +-- infoschema/builder.go | 6 ++-- session/tidb.go | 3 -- table/tables/state_remote.go | 56 +++++++++---------------------- table/tables/state_remote_test.go | 23 +++++++++++-- 6 files changed, 42 insertions(+), 54 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index ab680e93bea1c..2381c22e9fd05 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -47,7 +47,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -58,6 +57,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/set" + "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -6615,7 +6615,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) if err != nil { return errors.Trace(err) } - _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + // _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) return err } diff --git a/domain/domain.go b/domain/domain.go index 55f085dc2dfa0..e5af4bf5eacb7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -94,7 +94,7 @@ type Domain struct { serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - onClose func() + onClose func() sysFactory func(*Domain) (pools.Resource, error) } @@ -801,9 +801,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R return err } // step 2: domain reload the infoSchema. - fmt.Println("before domain.Reload() ...") err = do.Reload() - fmt.Println("after domain.Reload() ...") if err != nil { return err } diff --git a/infoschema/builder.go b/infoschema/builder.go index 26102fd7b8ab7..54b408a8ade71 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -42,7 +42,7 @@ type Builder struct { is *infoSchema // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature - store kv.Storage + store kv.Storage factory func() (pools.Resource, error) } @@ -636,9 +636,7 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf if err != nil { return nil, errors.Trace(err) } - fmt.Println("wrap table from meta to handle .... cached table!!!") err = t.Init(tmp.(sqlexec.SQLExecutor)) - fmt.Println("wrap table from meta to handle .... finish!!!") if err != nil { return nil, errors.Trace(err) } @@ -685,7 +683,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { return &Builder{ factory: factory, - store: store, + store: store, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, policyMap: map[string]*model.PolicyInfo{}, diff --git a/session/tidb.go b/session/tidb.go index a2c4a40c12458..911e64f3727f2 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -19,7 +19,6 @@ package session import ( - "fmt" "context" "sync" "sync/atomic" @@ -81,9 +80,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { dm.Delete(store) } d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) - fmt.Println("finish new domain>>>>>>>>") err1 = d.Init(ddlLease, sysFactory) - fmt.Println("init domain>>>>>>>>") if err1 != nil { // If we don't clean it, there are some dirty data when retrying the function of Init. d.Close() diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index b525aaa5ea179..14b9edec3166a 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -21,8 +21,8 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" ) @@ -316,58 +316,38 @@ func NewStateRemote(exec sqlExec) *stateRemoteHandle { var _ StateRemote = &stateRemoteHandle{} -// CreateMetaLockForCachedTable initializes the cached table meta lock information. -// func CreateMetaLockForCachedTable(h sqlExec) error { -// createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + -// "`tid` int(11) NOT NULL DEFAULT 0," + -// "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + -// "`lease` bigint(20) NOT NULL DEFAULT 0," + -// "PRIMARY KEY (`tid`))" -// _, err := h.ExecuteInternal(context.Background(), createTable) -// return err -// } - -// // InitRow add a new record into the cached table meta lock table. -// func InitRow(ctx context.Context, exec sqlExec, tid int) error { -// _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0)", tid) -// return err -// } - func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) { lockType, lease, _, err := h.loadRow(ctx, tid) return lockType, lease, err } -func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (/*succ*/ bool, error) { +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { succ := false err := h.runInTxn(ctx, func(ctx context.Context) error { lockType, lease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - - switch lockType { - case CachedTableLockNone: + // The old lock is outdated, clear orphan lock. + if now > lease { + succ = true if err := h.updateRow(ctx, tid, "READ", ts); err != nil { return errors.Trace(err) } + return nil + } + + switch lockType { + case CachedTableLockNone: case CachedTableLockRead: - // Update lease - if err := h.updateRow(ctx, tid, "READ", ts); err != nil { - return errors.Trace(err) - } case CachedTableLockWrite, CachedTableLockIntend: - if now > lease { - // Clear orphan lock - if err := h.updateRow(ctx, tid, "READ", ts); err != nil { - return errors.Trace(err) - } - } - // Fail to lock for read, others hold the write lock. return nil } - succ = true + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } + return nil }) return succ, err @@ -394,8 +374,8 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (/*retry*/ bool, error) { - err := h.beginTxn(ctx); +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) ( /*retry*/ bool, error) { + err := h.beginTxn(ctx) if err != nil { return false, errors.Trace(err) } @@ -461,7 +441,6 @@ func waitForLeaseExpire(oldReadLease, now uint64) { t1 := oracle.GetTimeFromTS(oldReadLease) t2 := oracle.GetTimeFromTS(now) waitDuration := t1.Sub(t2) - fmt.Println("sleep ... for old read lease to expire", waitDuration) time.Sleep(waitDuration) } } @@ -477,19 +456,16 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64 func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "begin") - fmt.Println("begin ... executed ... ", h.exec) return err } func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "commit") - fmt.Println("commit ... executed ... ", h.exec) return err } func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "rollback") - fmt.Println("rollback ... executed ... ", h.exec) return err } diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index 4f54593387437..43ff7e879649c 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -18,11 +18,30 @@ import ( "context" "testing" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func createMetaLockForCachedTable(h session.Session) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. +func initRow(ctx context.Context, exec session.Session, tid int) error { + _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) + return err +} + func TestStateRemote(t *testing.T) { t.Parallel() store, clean := testkit.CreateMockStore(t) @@ -33,12 +52,12 @@ func TestStateRemote(t *testing.T) { ctx := context.Background() h := tables.NewStateRemote(se) - err := tables.CreateMetaLockForCachedTable(se) + err := createMetaLockForCachedTable(se) require.NoError(t, err) require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) // Check the initial value. - require.NoError(t, tables.InitRow(ctx, se, 5)) + require.NoError(t, initRow(ctx, se, 5)) lockType, lease, err := h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockNone) From 7c7022bad6d1d39f5d248d8cb6c626eab45c00e4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 19 Nov 2021 12:22:34 +0800 Subject: [PATCH 05/13] fix several bugs --- ddl/ddl_api.go | 13 ++++- executor/builder.go | 48 ++++++++++------- planner/core/logical_plan_builder.go | 52 +++++++++++++----- session/bootstrap.go | 10 ++-- session/session.go | 4 ++ table/tables/cache.go | 3 ++ table/tables/cache_test.go | 81 ++++++++++++++++++---------- table/tables/state_remote.go | 14 +++++ 8 files changed, 161 insertions(+), 64 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 2381c22e9fd05..7f94fd8176226 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -6615,7 +6615,18 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) if err != nil { return errors.Trace(err) } - // _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + + _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + + // exec := ctx.(sqlexec.RestrictedSQLExecutor) + // stmt, err := exec.ParseWithParams(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) + // if err != nil { + // return errors.Trace(err) + // } + // _, _, err = exec.ExecRestrictedStmt(context.Background(), stmt) + if err != nil { + return errors.Trace(err) + } return err } diff --git a/executor/builder.go b/executor/builder.go index 8c8732841005f..fd98291411f11 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -19,6 +19,7 @@ import ( "context" "math" "sort" + "fmt" "strconv" "strings" "sync" @@ -29,7 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" + // "github.com/pingcap/log" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" @@ -1925,6 +1926,8 @@ func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor } func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { + b.inUpdateStmt = true + fmt.Println("run here in build update === ", b.inUpdateStmt) tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) multiUpdateOnSameTable := make(map[int64]bool) for _, info := range v.TblColPosInfos { @@ -1965,7 +1968,6 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { if b.err != nil { return nil } - b.inUpdateStmt = true updateExec := &UpdateExec{ baseExecutor: base, OrderedList: v.OrderedList, @@ -1998,6 +2000,8 @@ func getAssignFlag(ctx sessionctx.Context, v *plannercore.Update, schemaLen int) } func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { + b.inDeleteStmt = true + fmt.Println("in delete stmt === true set!!") tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) for _, info := range v.TblColPosInfos { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID) @@ -2010,7 +2014,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { if b.err != nil { return nil } - b.inDeleteStmt = true base := newBaseExecutor(b.ctx, v.Schema(), v.ID(), selExec) base.initCap = chunk.ZeroCapacity deleteExec := &DeleteExec{ @@ -4675,25 +4678,32 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name)) return nil } - cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS) + var ctbl table.CachedTable = tbl.(table.CachedTable) + cacheData := ctbl.TryReadFromCache(startTS) if cacheData != nil { b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true return cacheData - } - go func() { - defer func() { - if r := recover(); r != nil { - logutil.BgLogger().Error("panic in the recoverable goroutine", - zap.Reflect("r", r), - zap.Stack("stack trace")) - } - }() + } else { if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) - if err != nil { - log.Warn("Update Lock Info Error") - } - } - }() + fmt.Println("what the fuc?? Update Lock For Read !!!!!!!!!!!!!", b.inDeleteStmt, b.inUpdateStmt) + go plannercore.XXX(context.Background(), ctbl, b.ctx.GetStore(), startTS) + } + + // go func() { + // defer func() { + // if r := recover(); r != nil { + // logutil.BgLogger().Error("panic in the recoverable goroutine", + // zap.Reflect("r", r), + // zap.Stack("stack trace")) + // } + // }() + // if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { + // err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) + // if err != nil { + // log.Warn("Update Lock Info Error") + // } + // } + // }() + } return nil } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 99777f38e9234..cf3e49dbad787 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,9 +27,11 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "github.com/pingcap/log" + "golang.org/x/sync/singleflight" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" + "go.uber.org/zap" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -4158,18 +4160,26 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as us.SetChildren(ds) result = us } else { - go func() { - defer func() { - if r := recover(); r != nil { - } - }() - if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { - err := cachedTable.UpdateLockForRead(ctx, b.ctx.GetStore(), txn.StartTS()) - if err != nil { - log.Warn("Update Lock Info Error") - } - } - }() + if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { + fmt.Println("llllllllalalalalalllllllllllalal Update Lock For Read !!!!!!!!!!!!!") + go XXX(ctx, cachedTable, b.ctx.GetStore(), txn.StartTS()) + } + // go func() { + // defer func() { + // if r := recover(); r != nil { + // } + // }() + // if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { + // _, err, _ := sf.Do(fmt.Sprintf("%d", tableInfo.ID), func() (interface{}, error) { + // fmt.Println("llllllllalalalalalllllllllllalal Update Lock For Read !!!!!!!!!!!!!") + // err := cachedTable.UpdateLockForRead(ctx, b.ctx.GetStore(), txn.StartTS()) + // return nil, err + // }) + // if err != nil { + // logutil.BgLogger().Warn("Update Lock Info Error", zap.Error(err)) + // } + // } + // }() } } @@ -4195,6 +4205,22 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as return result, nil } +var sf singleflight.Group + +func XXX(ctx context.Context, tbl table.CachedTable, store kv.Storage, startTS uint64) { + defer func() { + if r := recover(); r != nil { + } + }() + _, err, _ := sf.Do(fmt.Sprintf("%d", tbl.Meta().ID), func() (interface{}, error) { + err := tbl.UpdateLockForRead(ctx, store, startTS) + return nil, err + }) + if err != nil { + logutil.BgLogger().Warn("Update Lock Info Error", zap.Error(err)) + } +} + func (b *PlanBuilder) timeRangeForSummaryTable() QueryTimeRange { const defaultSummaryDuration = 30 * time.Minute hints := b.TableHints() diff --git a/session/bootstrap.go b/session/bootstrap.go index 228be29984c60..c4743c47b4d22 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -357,7 +357,7 @@ const ( last_analyzed_at TIMESTAMP, PRIMARY KEY (table_id, column_id) CLUSTERED );` - + // CreateTableCacheMetaTable stores the cached table meta lock information. CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta ( tid int(11) NOT NULL DEFAULT 0, lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE', @@ -1706,12 +1706,14 @@ func doDDLWorks(s Session) { mustExecute(s, CreateSchemaIndexUsageTable) // Create stats_fm_sketch table. mustExecute(s, CreateStatsFMSketchTable) - // Create global_grants + // Create global_grants. mustExecute(s, CreateGlobalGrantsTable) - // Create capture_plan_baselines_blacklist + // Create capture_plan_baselines_blacklist. mustExecute(s, CreateCapturePlanBaselinesBlacklist) - // Create column_stats_usage table + // Create column_stats_usage table. mustExecute(s, CreateColumnStatsUsageTable) + // Create table_cache_meta table. + mustExecute(s, CreateTableCacheMetaTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index 51e090f577e37..35a017c87d76e 100644 --- a/session/session.go +++ b/session/session.go @@ -26,6 +26,7 @@ import ( "fmt" "net" "runtime/pprof" + // "runtime/debug" "runtime/trace" "strconv" "strings" @@ -566,6 +567,8 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables if len(txnTempTables) == 0 { + fmt.Printf("commitTxn ?????????????????????????????? %p\n", s) + // debug.PrintStack() return txn.Commit(ctx) } @@ -2112,6 +2115,7 @@ func (s *session) checkBeforeNewTxn(ctx context.Context) error { if err != nil { return err } + fmt.Printf(".......session is....%p...\n", s) logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", txnStartTS), diff --git a/table/tables/cache.go b/table/tables/cache.go index ebe96ee0bd917..9b9e90a02b9e5 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -15,6 +15,7 @@ package tables import ( + "fmt" "context" "sync/atomic" "time" @@ -68,12 +69,14 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { tmp := c.cacheData.Load() if tmp == nil { + fmt.Println("TryReadFromCache return nil because ... data not loaded") return nil } data := tmp.(*cacheData) if ts >= data.Start && ts < data.Lease { return data } + fmt.Println("TryReadFromCache return nil because ... ts not correct...", ts, data.Start, data.Lease) return nil } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e1c55ab3d97af..0d55d398ecb1f 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,6 +15,7 @@ package tables_test import ( + "fmt" "testing" "time" @@ -169,18 +170,21 @@ func TestCacheCondition(t *testing.T) { // Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id > 0") + fmt.Println("======= update 1") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id = 2") + fmt.Println("======= update 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Delete should not trigger cache. + fmt.Println("======= delete ") tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) @@ -209,28 +213,28 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustExec("alter table write_tmp1 cache") // Read and add read lock - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid for i := 0; i < 10; i++ { - if tk.HasPlan("select *from write_tmp1", "UnionScan") { + if tk.HasPlan("select * from write_tmp1", "UnionScan") { break } } tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists - require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan")) + require.False(t, tk.HasPlan("select * from write_tmp1", "UnionScan")) // wait write lock expire and check cache can be used again - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") - for !tk.HasPlan("select *from write_tmp1", "UnionScan") { - tk.MustExec("select *from write_tmp1") + for !tk.HasPlan("select * from write_tmp1", "UnionScan") { + tk.MustExec("select * from write_tmp1") } - tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } func TestCacheTableComplexRead(t *testing.T) { @@ -244,25 +248,40 @@ func TestCacheTableComplexRead(t *testing.T) { tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)") tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)") tk1.MustExec("alter table complex_cache cache") + tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + var i int + for i = 0; i < 100; i++ { + time.Sleep(100 * time.Millisecond) + if tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break + } + } + require.True(t, i < 10) + tk1.MustExec("begin") - tk1.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - - go func() { - tk2.MustExec("begin") - tk2.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - var i int - for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) - if tk2.HasPlan("select *from complex_cache where id > 7", "UnionScan") { - break - } + // go func() { + // defer func() { + // if r := recover(); r != nil { + // fmt.Println("xxxx", r) + // } + // }() + tk2.MustExec("begin") + tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + fmt.Println("run here 1111111111111111111111") + for i = 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + break } - require.True(t, i < 10) - tk2.MustExec("commit") - doneCh <- struct{}{} - }() - <-doneCh - tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") + fmt.Println("run here 2222222222222222222222222") + } + require.True(t, i < 10) + tk2.MustExec("commit") + doneCh <- struct{}{} + // }() + // <-doneCh + + tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") } @@ -282,10 +301,18 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustExec("insert into aba values (1, 1)") tk1.MustExec("alter table aba cache") tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + cacheUsed := false + for i := 0; i < 100; i++ { + if tk1.HasPlan("select * from aba", "UnionScan") { + cacheUsed = true + break + } + } + require.True(t, cacheUsed) // Begin, read from cache. tk1.MustExec("begin") - cacheUsed := false + cacheUsed = false for i := 0; i < 100; i++ { if tk1.HasPlan("select * from aba", "UnionScan") { cacheUsed = true diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 14b9edec3166a..ee26f48af1650 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -305,6 +305,7 @@ type sqlExec interface { type stateRemoteHandle struct { exec sqlExec + sync.Mutex } // NewStateRemote creates a StateRemote object. @@ -322,6 +323,8 @@ func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLoc } func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { + h.Lock() + defer h.Unlock() succ := false err := h.runInTxn(ctx, func(ctx context.Context) error { lockType, lease, _, err := h.loadRow(ctx, tid) @@ -354,6 +357,8 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts } func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error { + h.Lock() + defer h.Unlock() for { retry, err := h.lockForWriteOnce(ctx, tid, now, ts) if err != nil { @@ -381,6 +386,7 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now } defer func() { if err != nil { + fmt.Println("defer lockForWriteOnce, err ==", err) h.rollbackTxn(ctx) } }() @@ -428,6 +434,7 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now } // Otherwise, the WRITE should wait for the READ lease expire. + fmt.Println("rollback txn wait for read lease...") h.rollbackTxn(ctx) waitForLeaseExpire(oldReadLease, now) // And then retry change the lock to WRITE @@ -441,11 +448,14 @@ func waitForLeaseExpire(oldReadLease, now uint64) { t1 := oracle.GetTimeFromTS(oldReadLease) t2 := oracle.GetTimeFromTS(now) waitDuration := t1.Sub(t2) + fmt.Println("wait for lease expirte ===", waitDuration) time.Sleep(waitDuration) } } func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) { + h.Lock() + defer h.Unlock() _, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", ts, tid) if err != nil { return false, errors.Trace(err) @@ -456,16 +466,19 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64 func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "begin") + fmt.Printf("EEEEEEEEEEEEEEEE begin ...%p...\n", h.exec) return err } func (h *stateRemoteHandle) commitTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "commit") + fmt.Printf("EEEEEEEEEEEEEEEE commit %v...%p ...\n", err, h.exec) return err } func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error { _, err := h.execSQL(ctx, "rollback") + fmt.Println("EEEEEEEEEEEEEEEE rollback") return err } @@ -478,6 +491,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co err = fn(ctx) if err != nil { h.rollbackTxn(ctx) + fmt.Println("==== rollback exec error ===", err) return errors.Trace(err) } From 5fbc29ccc085a2281c1c3f1edbc783827149c572 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 19 Nov 2021 13:25:58 +0800 Subject: [PATCH 06/13] Merge master and reset unrelated change in this PR --- ddl/ddl_api.go | 69 ++++++++++++------------ domain/domain.go | 18 ++----- executor/builder.go | 52 ++++++++---------- infoschema/builder.go | 33 ++---------- meta/autoid/autoid.go | 4 -- planner/core/logical_plan_builder.go | 54 +++++-------------- session/bootstrap.go | 28 ++-------- session/session.go | 13 ++--- table/table.go | 4 +- table/tables/cache.go | 37 +++++++------ table/tables/cache_test.go | 81 ++++++++++------------------ table/tables/tables.go | 2 +- 12 files changed, 131 insertions(+), 264 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 7f94fd8176226..76d9a786988cd 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -57,7 +57,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/set" - "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -2635,6 +2634,22 @@ func isSameTypeMultiSpecs(specs []*ast.AlterTableSpec) bool { return true } +func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error { + if !sctx.GetSessionVars().EnableChangeMultiSchema { + if len(specs) > 1 { + return errRunMultiSchemaChanges + } + if len(specs) == 1 && len(specs[0].NewColumns) > 1 && specs[0].Tp == ast.AlterTableAddColumns { + return errRunMultiSchemaChanges + } + } else { + if len(specs) > 1 && !isSameTypeMultiSpecs(specs) { + return errRunMultiSchemaChanges + } + } + return nil +} + func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) { validSpecs, err := resolveAlterTableSpec(sctx, specs) if err != nil { @@ -2646,29 +2661,26 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE") } + err = checkMultiSpecs(sctx, validSpecs) + if err != nil { + return err + } + if len(validSpecs) > 1 { - if !sctx.GetSessionVars().EnableChangeMultiSchema { + switch validSpecs[0].Tp { + case ast.AlterTableAddColumns: + err = d.AddColumns(sctx, ident, validSpecs) + case ast.AlterTableDropColumn: + err = d.DropColumns(sctx, ident, validSpecs) + case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: + err = d.DropIndexes(sctx, ident, validSpecs) + default: return errRunMultiSchemaChanges } - - if isSameTypeMultiSpecs(validSpecs) { - switch validSpecs[0].Tp { - case ast.AlterTableAddColumns: - err = d.AddColumns(sctx, ident, validSpecs) - case ast.AlterTableDropColumn: - err = d.DropColumns(sctx, ident, validSpecs) - case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: - err = d.DropIndexes(sctx, ident, validSpecs) - default: - return errRunMultiSchemaChanges - } - if err != nil { - return errors.Trace(err) - } - return nil + if err != nil { + return errors.Trace(err) } - - return errRunMultiSchemaChanges + return nil } for _, spec := range validSpecs { @@ -6612,22 +6624,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - if err != nil { - return errors.Trace(err) - } - - _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) - - // exec := ctx.(sqlexec.RestrictedSQLExecutor) - // stmt, err := exec.ParseWithParams(context.Background(), "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", t.Meta().ID) - // if err != nil { - // return errors.Trace(err) - // } - // _, _, err = exec.ExecRestrictedStmt(context.Background(), stmt) - if err != nil { - return errors.Trace(err) - } - return err + return errors.Trace(err) } func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error) { diff --git a/domain/domain.go b/domain/domain.go index e5af4bf5eacb7..8ed2c18e58cd4 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -94,8 +94,7 @@ type Domain struct { serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - onClose func() - sysFactory func(*Domain) (pools.Resource, error) + onClose func() } // loadInfoSchema loads infoschema at startTS. @@ -160,7 +159,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -174,16 +173,6 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return is, false, currentSchemaVersion, nil, nil } -func (do *Domain) sysFacHack() (pools.Resource, error) { - // TODO: Here we create new sessions with sysFac in DDL, - // which will use `do` as Domain instead of call `domap.Get`. - // That's because `domap.Get` requires a lock, but before - // we initialize Domain finish, we can't require that again. - // After we remove the lazy logic of creating Domain, we - // can simplify code here. - return do.sysFactory(do) -} - func (do *Domain) fetchPolicies(m *meta.Meta) ([]*model.PolicyInfo, error) { allPolicies, err := m.ListPolicies() if err != nil { @@ -282,7 +271,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -722,7 +711,6 @@ const serverIDForStandalone = 1 // serverID for standalone deployment. // Init initializes a domain. func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error { - do.sysFactory = sysFactory perfschema.Init() if ebd, ok := do.store.(kv.EtcdBackend); ok { var addrs []string diff --git a/executor/builder.go b/executor/builder.go index fd98291411f11..7aee99715c807 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -19,7 +19,6 @@ import ( "context" "math" "sort" - "fmt" "strconv" "strings" "sync" @@ -30,7 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" - // "github.com/pingcap/log" + "github.com/pingcap/log" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" @@ -1926,8 +1925,6 @@ func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor } func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { - b.inUpdateStmt = true - fmt.Println("run here in build update === ", b.inUpdateStmt) tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) multiUpdateOnSameTable := make(map[int64]bool) for _, info := range v.TblColPosInfos { @@ -1968,6 +1965,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { if b.err != nil { return nil } + b.inUpdateStmt = true updateExec := &UpdateExec{ baseExecutor: base, OrderedList: v.OrderedList, @@ -2000,8 +1998,6 @@ func getAssignFlag(ctx sessionctx.Context, v *plannercore.Update, schemaLen int) } func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { - b.inDeleteStmt = true - fmt.Println("in delete stmt === true set!!") tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) for _, info := range v.TblColPosInfos { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID) @@ -2014,6 +2010,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { if b.err != nil { return nil } + b.inDeleteStmt = true base := newBaseExecutor(b.ctx, v.Schema(), v.ID(), selExec) base.initCap = chunk.ZeroCapacity deleteExec := &DeleteExec{ @@ -3684,7 +3681,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return nil, err } var kvRanges []kv.KeyRange - if keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { + if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { // In this case we can use dynamic partition pruning. locateKey := make([]types.Datum, e.Schema().Len()) kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) @@ -3739,7 +3736,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return nil, err } var kvRanges []kv.KeyRange - if keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { + if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { locateKey := make([]types.Datum, e.Schema().Len()) kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) for _, content := range lookUpContents { @@ -4678,32 +4675,25 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name)) return nil } - var ctbl table.CachedTable = tbl.(table.CachedTable) - cacheData := ctbl.TryReadFromCache(startTS) + cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS) if cacheData != nil { b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true return cacheData - } else { - if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - fmt.Println("what the fuc?? Update Lock For Read !!!!!!!!!!!!!", b.inDeleteStmt, b.inUpdateStmt) - go plannercore.XXX(context.Background(), ctbl, b.ctx.GetStore(), startTS) - } - - // go func() { - // defer func() { - // if r := recover(); r != nil { - // logutil.BgLogger().Error("panic in the recoverable goroutine", - // zap.Reflect("r", r), - // zap.Stack("stack trace")) - // } - // }() - // if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - // err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) - // if err != nil { - // log.Warn("Update Lock Info Error") - // } - // } - // }() } + go func() { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("panic in the recoverable goroutine", + zap.Reflect("r", r), + zap.Stack("stack trace")) + } + }() + if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { + err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS) + if err != nil { + log.Warn("Update Lock Info Error") + } + } + }() return nil } diff --git a/infoschema/builder.go b/infoschema/builder.go index 54b408a8ade71..d3c532a2bfffb 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -20,7 +20,6 @@ import ( "sort" "strings" - "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -34,7 +33,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" - "github.com/pingcap/tidb/util/sqlexec" ) // Builder builds a new InfoSchema. @@ -42,8 +40,7 @@ type Builder struct { is *infoSchema // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature - store kv.Storage - factory func() (pools.Resource, error) + store kv.Storage } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -441,7 +438,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i } } } - tbl, err := b.tableFromMeta(allocs, tblInfo) + tbl, err := tables.TableFromMeta(allocs, tblInfo) if err != nil { return nil, errors.Trace(err) } @@ -604,7 +601,7 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. } for _, di := range dbInfos { - err := b.createSchemaTablesForDB(di, b.tableFromMeta) + err := b.createSchemaTablesForDB(di, tables.TableFromMeta) if err != nil { return nil, errors.Trace(err) } @@ -625,25 +622,6 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement. return b, nil } -func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) { - ret, err := tables.TableFromMeta(alloc, tblInfo) - if err != nil { - return nil, errors.Trace(err) - } - if t, ok := ret.(table.CachedTable); ok { - var tmp pools.Resource - tmp, err = b.factory() - if err != nil { - return nil, errors.Trace(err) - } - err = t.Init(tmp.(sqlexec.SQLExecutor)) - if err != nil { - return nil, errors.Trace(err) - } - } - return ret, nil -} - type tableFromMetaFunc func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableFromMetaFunc) error { @@ -680,10 +658,9 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { +func NewBuilder(store kv.Storage) *Builder { return &Builder{ - factory: factory, - store: store, + store: store, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, policyMap: map[string]*model.PolicyInfo{}, diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 96d13d3bee48d..8ee99f5939a0e 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -467,10 +467,6 @@ func (alloc *allocator) GetType() AllocatorType { return alloc.allocType } -func (alloc *allocator) GetStore() kv.Storage { - return alloc.store -} - // NextStep return new auto id step according to previous step and consuming time. func NextStep(curStep int64, consumeDur time.Duration) int64 { failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index cf3e49dbad787..e1c9c11287cb2 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,11 +27,9 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "golang.org/x/sync/singleflight" - "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/log" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" - "go.uber.org/zap" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -3973,7 +3971,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as } // Skip storage engine check for CreateView. if b.capFlag&canExpandAST == 0 { - possiblePaths, err = filterPathByIsolationRead(b.ctx, possiblePaths, dbName) + possiblePaths, err = filterPathByIsolationRead(b.ctx, possiblePaths, tblName, dbName) if err != nil { return nil, err } @@ -4160,26 +4158,18 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as us.SetChildren(ds) result = us } else { - if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { - fmt.Println("llllllllalalalalalllllllllllalal Update Lock For Read !!!!!!!!!!!!!") - go XXX(ctx, cachedTable, b.ctx.GetStore(), txn.StartTS()) - } - // go func() { - // defer func() { - // if r := recover(); r != nil { - // } - // }() - // if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { - // _, err, _ := sf.Do(fmt.Sprintf("%d", tableInfo.ID), func() (interface{}, error) { - // fmt.Println("llllllllalalalalalllllllllllalal Update Lock For Read !!!!!!!!!!!!!") - // err := cachedTable.UpdateLockForRead(ctx, b.ctx.GetStore(), txn.StartTS()) - // return nil, err - // }) - // if err != nil { - // logutil.BgLogger().Warn("Update Lock Info Error", zap.Error(err)) - // } - // } - // }() + go func() { + defer func() { + if r := recover(); r != nil { + } + }() + if !b.inUpdateStmt && !b.inDeleteStmt && !b.ctx.GetSessionVars().StmtCtx.InExplainStmt { + err := cachedTable.UpdateLockForRead(b.ctx.GetStore(), txn.StartTS()) + if err != nil { + log.Warn("Update Lock Info Error") + } + } + }() } } @@ -4205,22 +4195,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as return result, nil } -var sf singleflight.Group - -func XXX(ctx context.Context, tbl table.CachedTable, store kv.Storage, startTS uint64) { - defer func() { - if r := recover(); r != nil { - } - }() - _, err, _ := sf.Do(fmt.Sprintf("%d", tbl.Meta().ID), func() (interface{}, error) { - err := tbl.UpdateLockForRead(ctx, store, startTS) - return nil, err - }) - if err != nil { - logutil.BgLogger().Warn("Update Lock Info Error", zap.Error(err)) - } -} - func (b *PlanBuilder) timeRangeForSummaryTable() QueryTimeRange { const defaultSummaryDuration = 30 * time.Minute hints := b.TableHints() diff --git a/session/bootstrap.go b/session/bootstrap.go index c4743c47b4d22..573c17bdaaf2e 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -357,14 +357,6 @@ const ( last_analyzed_at TIMESTAMP, PRIMARY KEY (table_id, column_id) CLUSTERED );` - // CreateTableCacheMetaTable stores the cached table meta lock information. - CreateTableCacheMetaTable = `CREATE TABLE IF NOT EXISTS mysql.table_cache_meta ( - tid int(11) NOT NULL DEFAULT 0, - lock_type enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE', - lease bigint(20) NOT NULL DEFAULT 0, - oldReadLease bigint(20) NOT NULL DEFAULT 0, - PRIMARY KEY (tid) - );` ) // bootstrap initiates system DB for a store. @@ -536,13 +528,11 @@ const ( version77 = 77 // version78 updates mysql.stats_buckets.lower_bound, mysql.stats_buckets.upper_bound and mysql.stats_histograms.last_analyze_pos from BLOB to LONGBLOB. version78 = 78 - // version79 adds the mysql.table_cache_meta table - version79 = 79 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version79 +var currentBootstrapVersion int64 = version78 var ( bootstrapVersion = []func(Session, int64){ @@ -624,7 +614,6 @@ var ( upgradeToVer76, upgradeToVer77, upgradeToVer78, - upgradeToVer79, } ) @@ -1623,13 +1612,6 @@ func upgradeToVer78(s Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms MODIFY last_analyze_pos LONGBLOB DEFAULT NULL") } -func upgradeToVer79(s Session, ver int64) { - if ver >= version79 { - return - } - doReentrantDDL(s, CreateTableCacheMetaTable) -} - func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1706,14 +1688,12 @@ func doDDLWorks(s Session) { mustExecute(s, CreateSchemaIndexUsageTable) // Create stats_fm_sketch table. mustExecute(s, CreateStatsFMSketchTable) - // Create global_grants. + // Create global_grants mustExecute(s, CreateGlobalGrantsTable) - // Create capture_plan_baselines_blacklist. + // Create capture_plan_baselines_blacklist mustExecute(s, CreateCapturePlanBaselinesBlacklist) - // Create column_stats_usage table. + // Create column_stats_usage table mustExecute(s, CreateColumnStatsUsageTable) - // Create table_cache_meta table. - mustExecute(s, CreateTableCacheMetaTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index 35a017c87d76e..4494fa984648c 100644 --- a/session/session.go +++ b/session/session.go @@ -26,7 +26,6 @@ import ( "fmt" "net" "runtime/pprof" - // "runtime/debug" "runtime/trace" "strconv" "strings" @@ -567,8 +566,6 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables if len(txnTempTables) == 0 { - fmt.Printf("commitTxn ?????????????????????????????? %p\n", s) - // debug.PrintStack() return txn.Commit(ctx) } @@ -1118,6 +1115,10 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { return sv.Value, nil } } + // It might have been written from an earlier TiDB version, so we should run the validation function + // To normalize the value to be safe for this version of TiDB. This also happens for session scoped + // variables in loadCommonGlobalVariablesIfNeeded -> SetSystemVarWithRelaxedValidation + sysVar = sv.ValidateWithRelaxedValidation(s.GetSessionVars(), sysVar, variable.ScopeGlobal) return sysVar, nil } @@ -2115,7 +2116,6 @@ func (s *session) checkBeforeNewTxn(ctx context.Context) error { if err != nil { return err } - fmt.Printf(".......session is....%p...\n", s) logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", txnStartTS), @@ -2448,11 +2448,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { runInBootstrapSession(store, upgrade) } - _, err := domap.Get(store) - if err != nil { - return nil, err - } - se, err := createSession(store) if err != nil { return nil, err diff --git a/table/table.go b/table/table.go index 1e2d4f4614538..f39e9b2d9fa8d 100644 --- a/table/table.go +++ b/table/table.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" - "github.com/pingcap/tidb/util/sqlexec" ) // Type is used to distinguish between different tables that store data in different ways. @@ -253,11 +252,10 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - Init(exec sqlexec.SQLExecutor) error // TryReadFromCache checks if the cache table is readable. TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table - UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error + UpdateLockForRead(store kv.Storage, ts uint64) error } diff --git a/table/tables/cache.go b/table/tables/cache.go index 9b9e90a02b9e5..ba8786a65bae3 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -15,7 +15,6 @@ package tables import ( - "fmt" "context" "sync/atomic" "time" @@ -26,12 +25,12 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" ) var ( + _ table.Table = &cachedTable{} _ table.CachedTable = &cachedTable{} ) @@ -69,35 +68,35 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { tmp := c.cacheData.Load() if tmp == nil { - fmt.Println("TryReadFromCache return nil because ... data not loaded") return nil } data := tmp.(*cacheData) if ts >= data.Start && ts < data.Lease { return data } - fmt.Println("TryReadFromCache return nil because ... ts not correct...", ts, data.Start, data.Lease) return nil } +var mockStateRemote = struct { + Ch chan remoteTask + Data *mockStateRemoteData +}{} + // NewCachedTable creates a new CachedTable Instance -func newCachedTable(tbl *TableCommon) (table.Table, error) { +func NewCachedTable(tbl *TableCommon) (table.Table, error) { + if mockStateRemote.Data == nil { + mockStateRemote.Data = newMockStateRemoteData() + mockStateRemote.Ch = make(chan remoteTask, 100) + go mockRemoteService(mockStateRemote.Data, mockStateRemote.Ch) + } ret := &cachedTable{ TableCommon: *tbl, + handle: &mockStateRemoteHandle{mockStateRemote.Ch}, } return ret, nil } -func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error { - raw, ok := exec.(sqlExec) - if !ok { - return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") - } - c.handle = NewStateRemote(raw) - return nil -} - func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) { buffer, err := newMemBuffer(store) if err != nil { @@ -139,11 +138,11 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return buffer, startTS, nil } -func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error { +func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { // Load data from original table and the update lock information. tid := c.Meta().ID lease := leaseFromTS(ts) - succ, err := c.handle.LockForRead(ctx, tid, ts, lease) + succ, err := c.handle.LockForRead(tid, ts, lease) if err != nil { return errors.Trace(err) } @@ -170,7 +169,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. return nil, err } now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) if err != nil { return nil, errors.Trace(err) } @@ -184,7 +183,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } now := txn.StartTS() - err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } @@ -198,7 +197,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } now := txn.StartTS() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now)) + err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now)) if err != nil { return errors.Trace(err) } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 0d55d398ecb1f..e1c55ab3d97af 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -15,7 +15,6 @@ package tables_test import ( - "fmt" "testing" "time" @@ -170,21 +169,18 @@ func TestCacheCondition(t *testing.T) { // Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id > 0") - fmt.Println("======= update 1") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id = 2") - fmt.Println("======= update 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) } // Contains PointGet Delete should not trigger cache. - fmt.Println("======= delete ") tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) @@ -213,28 +209,28 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustExec("alter table write_tmp1 cache") // Read and add read lock - tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid for i := 0; i < 10; i++ { - if tk.HasPlan("select * from write_tmp1", "UnionScan") { + if tk.HasPlan("select *from write_tmp1", "UnionScan") { break } } tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists - require.False(t, tk.HasPlan("select * from write_tmp1", "UnionScan")) + require.False(t, tk.HasPlan("select *from write_tmp1", "UnionScan")) // wait write lock expire and check cache can be used again - for !tk.HasPlan("select * from write_tmp1", "UnionScan") { - tk.MustExec("select * from write_tmp1") + for !tk.HasPlan("select *from write_tmp1", "UnionScan") { + tk.MustExec("select *from write_tmp1") } - tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") - for !tk.HasPlan("select * from write_tmp1", "UnionScan") { - tk.MustExec("select * from write_tmp1") + for !tk.HasPlan("select *from write_tmp1", "UnionScan") { + tk.MustExec("select *from write_tmp1") } - tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) + tk.MustQuery("select *from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } func TestCacheTableComplexRead(t *testing.T) { @@ -248,40 +244,25 @@ func TestCacheTableComplexRead(t *testing.T) { tk1.MustExec("create table complex_cache (id int primary key auto_increment, u int unique, v int)") tk1.MustExec("insert into complex_cache values" + "(5, 105, 1005), (7, 117, 1007), (9, 109, 1009)") tk1.MustExec("alter table complex_cache cache") - tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - var i int - for i = 0; i < 100; i++ { - time.Sleep(100 * time.Millisecond) - if tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") { - break - } - } - require.True(t, i < 10) - tk1.MustExec("begin") - // go func() { - // defer func() { - // if r := recover(); r != nil { - // fmt.Println("xxxx", r) - // } - // }() - tk2.MustExec("begin") - tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) - fmt.Println("run here 1111111111111111111111") - for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) - if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { - break + tk1.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + + go func() { + tk2.MustExec("begin") + tk2.MustQuery("select *from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + var i int + for i = 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + if tk2.HasPlan("select *from complex_cache where id > 7", "UnionScan") { + break + } } - fmt.Println("run here 2222222222222222222222222") - } - require.True(t, i < 10) - tk2.MustExec("commit") - doneCh <- struct{}{} - // }() - // <-doneCh - - tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") + require.True(t, i < 10) + tk2.MustExec("commit") + doneCh <- struct{}{} + }() + <-doneCh + tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") } @@ -301,18 +282,10 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustExec("insert into aba values (1, 1)") tk1.MustExec("alter table aba cache") tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) - cacheUsed := false - for i := 0; i < 100; i++ { - if tk1.HasPlan("select * from aba", "UnionScan") { - cacheUsed = true - break - } - } - require.True(t, cacheUsed) // Begin, read from cache. tk1.MustExec("begin") - cacheUsed = false + cacheUsed := false for i := 0; i < 100; i++ { if tk1.HasPlan("select * from aba", "UnionScan") { cacheUsed = true diff --git a/table/tables/tables.go b/table/tables/tables.go index 9d263f99c0516..ecb7e23137745 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -146,7 +146,7 @@ func TableFromMeta(allocs autoid.Allocators, tblInfo *model.TableInfo) (table.Ta return nil, err } if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { - return newCachedTable(&t) + return NewCachedTable(&t) } return &t, nil } From d3c8c78aeb67a724c785bc1ea5d2cdca37748066 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 19 Nov 2021 14:03:33 +0800 Subject: [PATCH 07/13] make golint happy --- table/tables/state_remote.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index f6be4360cf213..86bdbaa2934b7 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" @@ -386,7 +387,7 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now } defer func() { if err != nil { - h.rollbackTxn(ctx) + terror.Log(h.rollbackTxn(ctx)) } }() @@ -433,7 +434,7 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now } // Otherwise, the WRITE should wait for the READ lease expire. - h.rollbackTxn(ctx) + terror.Log(h.rollbackTxn(ctx)) waitForLeaseExpire(oldReadLease, now) // And then retry change the lock to WRITE return true, nil @@ -484,7 +485,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co err = fn(ctx) if err != nil { - h.rollbackTxn(ctx) + terror.Log(h.rollbackTxn(ctx)) return errors.Trace(err) } From 468328f599e31e730e64faf2f9fa245d22428029 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Nov 2021 15:07:20 +0800 Subject: [PATCH 08/13] address comment --- table/tables/state_remote.go | 101 ++++++++++++++++------------------- 1 file changed, 46 insertions(+), 55 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 86bdbaa2934b7..5c839a704661e 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -361,14 +361,15 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts h.Lock() defer h.Unlock() for { - retry, err := h.lockForWriteOnce(ctx, tid, now, ts) + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, now, ts) if err != nil { return err } - if !retry { + if waitAndRetry == 0 { break } + time.Sleep(waitAndRetry) store := h.exec.GetStore() o := store.GetOracle() newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope}) @@ -380,75 +381,65 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) ( /*retry*/ bool, error) { - err := h.beginTxn(ctx) - if err != nil { - return false, errors.Trace(err) - } - defer func() { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context) error { + lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { - terror.Log(h.rollbackTxn(ctx)) + return errors.Trace(err) } - }() - - lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) - if err != nil { - return false, errors.Trace(err) - } - // The lease is outdated, so lock is invalid, clear orphan lock of any kind. - if now > lease { - if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. + if now > lease { + if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + return nil } - return false, h.commitTxn(ctx) - } - // The lease is valid. - switch lockType { - case CachedTableLockNone: - if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) - } - return false, h.commitTxn(ctx) - case CachedTableLockRead: - // Change from READ to INTEND - if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { - return false, errors.Trace(err) - } - if err = h.commitTxn(ctx); err != nil { - return false, errors.Trace(err) - } + // The lease is valid. + switch lockType { + case CachedTableLockNone: + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } + case CachedTableLockRead: + // Change from READ to INTEND + if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { + return errors.Trace(err) + } + if err = h.commitTxn(ctx); err != nil { + return errors.Trace(err) + } - // Wait for lease to expire, and then retry. - waitForLeaseExpire(oldReadLease, now) - return true, nil - case CachedTableLockIntend, CachedTableLockWrite: - // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. - if now > oldReadLease { - if lockType == CachedTableLockIntend { - if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { - return false, errors.Trace(err) + // Wait for lease to expire, and then retry. + waitAndRetry = waitForLeaseExpire(oldReadLease, now) + case CachedTableLockIntend, CachedTableLockWrite: + // `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here. + if now > oldReadLease { + if lockType == CachedTableLockIntend { + if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil { + return errors.Trace(err) + } } + return nil } - return false, h.commitTxn(ctx) + // Otherwise, the WRITE should wait for the READ lease expire. + // And then retry changing the lock to WRITE + waitAndRetry = waitForLeaseExpire(oldReadLease, now) } + return nil + }) - // Otherwise, the WRITE should wait for the READ lease expire. - terror.Log(h.rollbackTxn(ctx)) - waitForLeaseExpire(oldReadLease, now) - // And then retry change the lock to WRITE - return true, nil - } - return false, errors.New("should never run here") + return } -func waitForLeaseExpire(oldReadLease, now uint64) { +func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { if oldReadLease >= now { t1 := oracle.GetTimeFromTS(oldReadLease) t2 := oracle.GetTimeFromTS(now) waitDuration := t1.Sub(t2) - time.Sleep(waitDuration) + return waitDuration } + return 0 } func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, ts uint64) (bool, error) { From 9a855d38f82927506a132a51f519062546925c80 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Nov 2021 17:23:50 +0800 Subject: [PATCH 09/13] address comment --- table/tables/state_remote.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5c839a704661e..fdd146a1d01d5 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -348,8 +348,10 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts return nil } succ = true - if err := h.updateRow(ctx, tid, "READ", ts); err != nil { - return errors.Trace(err) + if ts > lease { // Note the check, don't decrease lease value! + if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + return errors.Trace(err) + } } return nil From 2fb67a2d8dfc20a020e2567930912363a591519e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Nov 2021 19:31:10 +0800 Subject: [PATCH 10/13] add comment for LockForRead --- table/tables/state_remote.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index fdd146a1d01d5..5ea636b325b3a 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -61,8 +61,11 @@ type StateRemote interface { // Load obtain the corresponding lock type and lease value according to the tableID Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) - // LockForRead try to add a read lock to the table with the specified tableID - LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) + // LockForRead try to add a read lock to the table with the specified tableID. + // If this operation succeed, according to the protocol, the TiKV data will not be + // modified until the lease expire. It's safe for the caller to load the table data, + // cache and use the data. + LockForRead(ctx context.Context, tid int64, now, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID LockForWrite(ctx context.Context, tid int64, now, ts uint64) error @@ -323,6 +326,8 @@ func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLoc return lockType, lease, err } +// LockForRead try to lock the table, if this operation succeed, the remote data +// is "read locked" and will not be modified according to the protocol, until the lease expire. func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) { h.Lock() defer h.Unlock() From c75d373bb51c51bff5a61bf376ae5aeeabd9ffe4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Nov 2021 20:41:08 +0800 Subject: [PATCH 11/13] add comment for `now` parameter of LockForRead --- table/tables/state_remote.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5ea636b325b3a..8a059a74347f9 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -65,6 +65,10 @@ type StateRemote interface { // If this operation succeed, according to the protocol, the TiKV data will not be // modified until the lease expire. It's safe for the caller to load the table data, // cache and use the data. + // The parameter `now` means the current tso. Because the tso is get from PD, in + // the TiDB side, its value lags behind the real one forever, this doesn't matter. + // Because `now` is only used to clean up the orphan lock, as long as it's smaller + // than the real one, the correctness of the algorithm is not violated. LockForRead(ctx context.Context, tid int64, now, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID From 434772a24b536af4c6a53308825e7c5d07a5047d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 23 Nov 2021 16:00:23 +0800 Subject: [PATCH 12/13] Merge branch 'master' into state-remote --- executor/aggregate_test.go | 57 -------------------------------------- 1 file changed, 57 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index f0b41f245c61c..2a9360421f704 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1469,63 +1469,6 @@ func TestIssue20658(t *testing.T) { } } -func TestRandomPanicAggConsume(t *testing.T) { - store, clean := testkit.CreateMockStore(t) - defer clean() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set @@tidb_max_chunk_size=32") - tk.MustExec("set @@tidb_init_chunk_size=1") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - for i := 0; i <= 1000; i++ { - tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) - } - - fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic" - require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")")) - defer func() { - require.NoError(t, failpoint.Disable(fpName)) - }() - - // Test 10 times panic for each AggExec. - var res sqlexec.RecordSet - for i := 1; i <= 10; i++ { - var err error - for err == nil { - // Test paralleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test unparalleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test stream agg. - res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - } -} - func TestIssue23277(t *testing.T) { t.Parallel() store, clean := testkit.CreateMockStore(t) From ccba17d793e8bcacf150557a3d13eac88f345ec6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 24 Nov 2021 14:26:48 +0800 Subject: [PATCH 13/13] address comment --- table/tables/state_remote.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 5c984c4710e2e..938648f458c7d 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -468,10 +468,6 @@ func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil { return errors.Trace(err) } - if err = h.commitTxn(ctx); err != nil { - return errors.Trace(err) - } - // Wait for lease to expire, and then retry. waitAndRetry = waitForLeaseExpire(oldReadLease, now) case CachedTableLockIntend, CachedTableLockWrite: