Skip to content

Commit

Permalink
*: add metadata lock when using the plan cache (pingcap#51897) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and wjhuang2016 committed Nov 7, 2024
1 parent 8b15640 commit d61eade
Show file tree
Hide file tree
Showing 16 changed files with 1,664 additions and 14 deletions.
3 changes: 2 additions & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ go_test(
],
embed = [":ingest"],
flaky = True,
shard_count = 13,
shard_count = 15,
deps = [
"//config",
"//ddl/internal/session",
"//ddl/util/callback",
"//errno",
"//kv",
"//parser/model",
"//server",
"//sessionctx",
"//testkit",
"//tests/realtikvtest",
Expand Down
100 changes: 100 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ package ingest_test
import (
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
Expand Down Expand Up @@ -211,3 +214,100 @@ func TestAddIndexIngestTimezone(t *testing.T) {
tk.MustExec("alter table t add index idx(t);")
tk.MustExec("admin check table t;")
}

func TestMDLPreparePlanCacheExecute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer injectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'update t set a = ? where a = ?';`)
tk.MustExec(`set @a = 1, @b = 3;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)

tk.MustExec("begin")

ch := make(chan struct{})

var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

tk.MustQuery("select * from t2")
tk.MustExec(`set @a = 2, @b=4;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
// The plan is from cache, the metadata lock should be added to block the DDL.
ch <- struct{}{}

time.Sleep(5 * time.Second)

tk.MustExec("commit")

wg.Wait()

tk.MustExec("admin check table t")
}

func TestMDLPreparePlanCacheExecute2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer injectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'select * from t where a = ?';`)
tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)

tk.MustExec("begin")
tk.MustQuery("select * from t2")

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

wg.Wait()

tk.MustExec(`set @a = 2;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
// The plan should not be from cache because the schema has changed.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("commit")

tk.MustExec("admin check table t")
}
1 change: 1 addition & 0 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ func dumpPlanReplayerExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanR
return err
}

// extractTableNames extracts table names from the given stmts.
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})
sc.MDLRelatedTableIDs = make(map[int64]int64)

isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
4 changes: 2 additions & 2 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
}
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.ctx, true, stmt0.Text(), stmt0, nil)
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.ctx, true, stmt0.Text(), stmt0, sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema())
if err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.ctx.GetSessionVars().EnablePreparedPlanCache {
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.ctx, preparedObj)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), preparedObj.RelateVersion)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func TestBatchInsertDelete(t *testing.T) {
kv.TxnTotalSizeLimit.Store(originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
kv.TxnTotalSizeLimit.Store(6000)
kv.TxnTotalSizeLimit.Store(7000)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 2 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

tableInfo.Revision++

data, err := json.Marshal(tableInfo)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ type TableInfo struct {
ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"`

TTLInfo *TTLInfo `json:"ttl_info"`

// Revision is per table schema's version, it will be increased when the schema changed.
Revision uint64 `json:"revision"`
}

// SepAutoInc decides whether _rowid and auto_increment id use separate allocator.
Expand Down
14 changes: 14 additions & 0 deletions pkg/planner/core/tests/prepare/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "prepare_test",
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 21,
deps = [
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit d61eade

Please sign in to comment.