Skip to content

Commit

Permalink
*: add metadata lock when using the plan cache (#51897) (#52957)
Browse files Browse the repository at this point in the history
close #51407
  • Loading branch information
ti-chi-bot authored May 31, 2024
1 parent 39ea2b3 commit 84e2926
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 22 deletions.
3 changes: 2 additions & 1 deletion pkg/ddl/tests/metadatalock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ go_test(
"mdl_test.go",
],
flaky = True,
shard_count = 34,
shard_count = 36,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest/testutil",
"//pkg/errno",
"//pkg/server",
"//pkg/testkit",
Expand Down
98 changes: 98 additions & 0 deletions pkg/ddl/tests/metadatalock/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/failpoint"
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -895,6 +896,103 @@ func TestMDLPreparePlanCacheInvalid(t *testing.T) {
tk.MustQuery(`execute stmt_test_1 using @a;`).Check(testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 <nil>"))
}

func TestMDLPreparePlanCacheExecute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'update t set a = ? where a = ?';`)
tk.MustExec(`set @a = 1, @b = 3;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)

tk.MustExec("begin")

ch := make(chan struct{})

var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

tk.MustQuery("select * from t2")
tk.MustExec(`set @a = 2, @b=4;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
// The plan is from cache, the metadata lock should be added to block the DDL.
ch <- struct{}{}

time.Sleep(5 * time.Second)

tk.MustExec("commit")

wg.Wait()

tk.MustExec("admin check table t")
}

func TestMDLPreparePlanCacheExecute2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'select * from t where a = ?';`)
tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)

tk.MustExec("begin")
tk.MustQuery("select * from t2")

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

wg.Wait()

tk.MustExec(`set @a = 2;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
// The plan should not be from cache because the schema has changed.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("commit")

tk.MustExec("admin check table t")
}

func TestMDLDisable2Enable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func dumpPlanReplayerExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanR
return err
}

// extractTableNames extracts table names from the given stmts.
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})
sc.MDLRelatedTableIDs = make(map[int64]int64)

isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (e *PrepareExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}
}
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, nil)
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema())
if err != nil {
return err
}
Expand Down Expand Up @@ -207,7 +208,7 @@ func (e *DeallocateExec) Next(context.Context, *chunk.Chunk) error {
if e.Ctx().GetSessionVars().EnablePreparedPlanCache {
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.Ctx(), preparedObj)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), preparedObj.RelateVersion)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func TestBatchInsertDelete(t *testing.T) {
kv.TxnTotalSizeLimit.Store(originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
kv.TxnTotalSizeLimit.Store(6000)
kv.TxnTotalSizeLimit.Store(7000)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,8 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

tableInfo.Revision++

data, err := json.Marshal(tableInfo)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ type TableInfo struct {
ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"`

TTLInfo *TTLInfo `json:"ttl_info"`

// Revision is per table schema's version, it will be increased when the schema changed.
Revision uint64 `json:"revision"`
}

// TableNameInfo provides meta data describing a table name info.
Expand Down
36 changes: 31 additions & 5 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,33 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
return errors.Trace(err)
}

// step 3: check schema version
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
stmt.tbls[i] = tblByName
stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
}
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], stmt.tbls[i], is)
if err != nil {
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
}

// step 4: check schema version
if schemaNotMatch || stmt.PreparedAst.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
Expand All @@ -125,7 +150,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmtAst.SchemaVersion = is.SchemaMetaVersion()
}

// step 4: handle expiration
// step 5: handle expiration
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
Expand All @@ -136,6 +161,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmtAst.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}

return nil
}

Expand Down Expand Up @@ -188,7 +214,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -333,7 +359,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down
Loading

0 comments on commit 84e2926

Please sign in to comment.