From e932b1a696bbc69fec1d4320fa26b5240dd089e7 Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 24 Jun 2024 11:19:31 +0800 Subject: [PATCH 1/3] *: fix a bug about using the wrong schame in plan cache. --- pkg/planner/core/plan_cache.go | 4 +- .../testserverclient/server_client.go | 182 ++++++++++++++++++ pkg/server/tests/commontest/tidb_test.go | 5 + pkg/server/tests/servertestkit/testkit.go | 32 ++- 4 files changed, 220 insertions(+), 3 deletions(-) diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index 9f5789f49e600..d4b2b6fce296b 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -107,7 +107,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep // step 3: add metadata lock and check each table's schema version schemaNotMatch := false for i := 0; i < len(stmt.dbName); i++ { - _, ok := is.TableByID(stmt.tbls[i].Meta().ID) + tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID) if !ok { tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name) if err != nil { @@ -122,7 +122,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep schemaNotMatch = true continue } - if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision { + if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || tbl.Meta().Revision != newTbl.Meta().Revision { schemaNotMatch = true } stmt.tbls[i] = newTbl diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index fa6d42f42935e..31d396c7eef2e 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -37,9 +37,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/ddl/util/callback" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/testkit" @@ -2788,4 +2791,183 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) { }) } +func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit { + db, err := sql.Open("mysql", cli.GetDSN(overrider)) + require.NoError(t, err) + + return testkit.NewDBTestKit(t, db) +} + +func MustExec(t *testing.T, ctx context.Context, conn *sql.Conn, sql string) { + _, err := conn.QueryContext(ctx, sql) + require.NoError(t, err) +} + +func MustQuery(t *testing.T, ctx context.Context, cli *TestServerClient, conn *sql.Conn, sql string) { + rs, err := conn.QueryContext(ctx, sql) + require.NoError(t, err) + if rs != nil { + cli.Rows(t, rs) + rs.Close() + } +} + +type sqlWithErr struct { + sql string + expectErr error + stmt *sql.Stmt +} + +type expectQuery struct { + sql string + rows []string +} + +func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) { + cli.RunTests(t, func(config *mysql.Config) { + config.MaxAllowedPacket = 1024 + }, func(dbt *testkit.DBTestKit) { + ctx := context.Background() + + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + MustExec(t, ctx, conn, "create database test_db_state default charset utf8 default collate utf8_bin") + MustExec(t, ctx, conn, "use test_db_state") + MustExec(t, ctx, conn, `CREATE TABLE stock ( + a int NOT NULL, + b char(30) NOT NULL, + c int, + d char(64), + PRIMARY KEY(a,b) +) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment'; +`) + MustExec(t, ctx, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')") + MustExec(t, ctx, conn, "alter table stock add column cct_1 int default 10") + MustExec(t, ctx, conn, "alter table stock modify cct_1 json") + MustExec(t, ctx, conn, "alter table stock add column adc_1 smallint") + defer MustExec(t, ctx, conn, "drop database test_db_state") + + sqls := make([]sqlWithErr, 5) + sqls[0] = sqlWithErr{"begin", nil, nil} + sqls[1] = sqlWithErr{"SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE", nil, nil} + sqls[2] = sqlWithErr{"UPDATE stock SET c = ? WHERE a= ? AND b = 'a'", nil, nil} + sqls[3] = sqlWithErr{"UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'", nil, nil} + sqls[4] = sqlWithErr{"commit", nil, nil} + dropColumnSQL := "alter table stock drop column cct_1" + query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x \n2 b 102 z "}} + runTestInSchemaState(t, conn, cli, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query) + }) +} + +func runTestInSchemaState( + t *testing.T, + conn *sql.Conn, + cli *TestServerClient, + dom *domain.Domain, + state model.SchemaState, + isOnJobUpdated bool, + dropColumnSQL string, + sqlWithErrs []sqlWithErr, + expectQuery *expectQuery, +) { + ctx := context.Background() + MustExec(t, ctx, conn, "use test_db_state") + + callback := &callback.TestDDLCallback{Do: dom} + prevState := model.StateNone + var checkErr error + dbt := cli.getNewDB(t, func(config *mysql.Config) { + config.MaxAllowedPacket = 1024 + }) + conn1, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + defer func() { + err := dbt.GetDB().Close() + require.NoError(t, err) + }() + MustExec(t, ctx, conn1, "use test_db_state") + + for i, sqlWithErr := range sqlWithErrs { + // Start the test txn. + // Step 1: begin(when i = 0). + if i == 0 || i == len(sqlWithErrs)-1 { + sqlWithErr := sqlWithErrs[i] + MustExec(t, ctx, conn1, sqlWithErr.sql) + } else { + // Step 2: prepare stmts. + // SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE + // UPDATE stock SET c = ? WHERE a= ? AND b = 'a' + // UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b' + stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql) + require.NoError(t, err) + sqlWithErr.stmt = stmt + sqlWithErrs[i] = sqlWithErr + } + } + + // Step 3: begin. + sqlWithErr := sqlWithErrs[0] + MustExec(t, ctx, conn1, sqlWithErr.sql) + + prevState = model.StateNone + state = model.StateWriteOnly + cbFunc1 := func(job *model.Job) { + if jobStateOrLastSubJobState(job) == prevState || checkErr != nil { + return + } + prevState = jobStateOrLastSubJobState(job) + if prevState != state { + return + } + // Step 4: exec stmts in write-only state (dropping a colum). + // SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b") + // UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1) + // UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2) + // commit. + sqls := sqlWithErrs[1:] + for i, sqlWithErr := range sqls { + if i == 0 { + _, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b") + require.NoError(t, err) + } else if i == 1 || i == 2 { + _, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i) + require.NoError(t, err) + } else { + MustQuery(t, ctx, cli, conn1, sqlWithErr.sql) + } + } + } + if isOnJobUpdated { + callback.OnJobUpdatedExported.Store(&cbFunc1) + } else { + callback.OnJobRunBeforeExported = cbFunc1 + } + d := dom.DDL() + originalCallback := d.GetHook() + d.SetHook(callback) + MustExec(t, ctx, conn, dropColumnSQL) + require.NoError(t, checkErr) + + // Check the result. + // select * from stock + if expectQuery != nil { + rs, err := conn.QueryContext(ctx, expectQuery.sql) + require.NoError(t, err) + if expectQuery.rows == nil { + require.Nil(t, rs) + } else { + cli.CheckRows(t, rs, expectQuery.rows[0]) + } + } + d.SetHook(originalCallback) +} + +func jobStateOrLastSubJobState(job *model.Job) model.SchemaState { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + subs := job.MultiSchemaInfo.SubJobs + return subs[len(subs)-1].SchemaState + } + return job.SchemaState +} + //revive:enable:exported diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index 9482a51b6cd14..d9c9b6ce4fdd8 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3089,6 +3089,11 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) { ts.RunTestTypeAndCharsetOfSendLongData(t) } +func TestIssue53634(t *testing.T) { + ts := servertestkit.CreateTidbTestSuiteWithDDLLease(t, "20s") + ts.RunTestIssue53634(t, ts.Domain) +} + func TestAuthSocket(t *testing.T) { defer server2.ClearOSUserForAuthSocket() diff --git a/pkg/server/tests/servertestkit/testkit.go b/pkg/server/tests/servertestkit/testkit.go index d1efdf4cd37f3..348ac91a3bee4 100644 --- a/pkg/server/tests/servertestkit/testkit.go +++ b/pkg/server/tests/servertestkit/testkit.go @@ -19,7 +19,9 @@ import ( "database/sql" "sync" "testing" + "time" + "github.com/cockroachdb/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" @@ -35,6 +37,7 @@ import ( topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" "github.com/stretchr/testify/require" "go.opencensus.io/stats/view" + "go.uber.org/zap" ) // TidbTestSuite is a test suite for tidb @@ -48,13 +51,37 @@ type TidbTestSuite struct { // CreateTidbTestSuite creates a test suite for tidb func CreateTidbTestSuite(t *testing.T) *TidbTestSuite { + cfg := newTestConfig() + return CreateTidbTestSuiteWithCfg(t, cfg) +} + +// CreateTidbTestSuiteWithDDLLease creates a test suite with DDL lease for tidb. +func CreateTidbTestSuiteWithDDLLease(t *testing.T, ddlLease string) *TidbTestSuite { + cfg := newTestConfig() + cfg.Lease = ddlLease + return CreateTidbTestSuiteWithCfg(t, cfg) +} + +func newTestConfig() *config.Config { cfg := util.NewTestConfig() cfg.Port = 0 cfg.Status.ReportStatus = true cfg.Status.StatusPort = 0 cfg.Status.RecordDBLabel = true cfg.Performance.TCPKeepAlive = true - return CreateTidbTestSuiteWithCfg(t, cfg) + return cfg +} + +// parseDuration parses lease argument string. +func parseDuration(lease string) (time.Duration, error) { + dur, err := time.ParseDuration(lease) + if err != nil { + dur, err = time.ParseDuration(lease + "s") + } + if err != nil || dur < 0 { + return 0, errors.Newf("invalid lease duration", zap.String("lease", lease)) + } + return dur, nil } // CreateTidbTestSuiteWithCfg creates a test suite for tidb with config @@ -66,6 +93,9 @@ func CreateTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *TidbTestSuite ts.Store, err = mockstore.NewMockStore() session.DisableStats4Test() require.NoError(t, err) + ddlLeaseDuration, err := parseDuration(cfg.Lease) + require.NoError(t, err) + session.SetSchemaLease(ddlLeaseDuration) ts.Domain, err = session.BootstrapSession(ts.Store) require.NoError(t, err) ts.Tidbdrv = srv.NewTiDBDriver(ts.Store) From 77bf840b1334c8e33aef4c264a621fefb13f85b3 Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 24 Jun 2024 20:35:13 +0800 Subject: [PATCH 2/3] *: tiny update bazel --- pkg/planner/core/plan_cache.go | 2 +- .../internal/testserverclient/BUILD.bazel | 3 ++ .../testserverclient/server_client.go | 47 +++++++++---------- pkg/server/tests/servertestkit/BUILD.bazel | 2 + 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index d4b2b6fce296b..dbd77dce895db 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -122,7 +122,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep schemaNotMatch = true continue } - if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || tbl.Meta().Revision != newTbl.Meta().Revision { + if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) { schemaNotMatch = true } stmt.tbls[i] = newTbl diff --git a/pkg/server/internal/testserverclient/BUILD.bazel b/pkg/server/internal/testserverclient/BUILD.bazel index f90bad7322d82..322b6a91f1446 100644 --- a/pkg/server/internal/testserverclient/BUILD.bazel +++ b/pkg/server/internal/testserverclient/BUILD.bazel @@ -6,9 +6,12 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/server/internal/testserverclient", visibility = ["//pkg/server:__subpackages__"], deps = [ + "//pkg/ddl/util/callback", + "//pkg/domain", "//pkg/errno", "//pkg/kv", "//pkg/metrics", + "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/server", "//pkg/testkit", diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index 31d396c7eef2e..86140d33ee451 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -2798,12 +2798,12 @@ func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) * return testkit.NewDBTestKit(t, db) } -func MustExec(t *testing.T, ctx context.Context, conn *sql.Conn, sql string) { +func MustExec(ctx context.Context, t *testing.T, conn *sql.Conn, sql string) { _, err := conn.QueryContext(ctx, sql) require.NoError(t, err) } -func MustQuery(t *testing.T, ctx context.Context, cli *TestServerClient, conn *sql.Conn, sql string) { +func MustQuery(ctx context.Context, t *testing.T, cli *TestServerClient, conn *sql.Conn, sql string) { rs, err := conn.QueryContext(ctx, sql) require.NoError(t, err) if rs != nil { @@ -2813,9 +2813,8 @@ func MustQuery(t *testing.T, ctx context.Context, cli *TestServerClient, conn *s } type sqlWithErr struct { - sql string - expectErr error - stmt *sql.Stmt + stmt *sql.Stmt + sql string } type expectQuery struct { @@ -2831,9 +2830,9 @@ func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) conn, err := dbt.GetDB().Conn(ctx) require.NoError(t, err) - MustExec(t, ctx, conn, "create database test_db_state default charset utf8 default collate utf8_bin") - MustExec(t, ctx, conn, "use test_db_state") - MustExec(t, ctx, conn, `CREATE TABLE stock ( + MustExec(ctx, t, conn, "create database test_db_state default charset utf8 default collate utf8_bin") + MustExec(ctx, t, conn, "use test_db_state") + MustExec(ctx, t, conn, `CREATE TABLE stock ( a int NOT NULL, b char(30) NOT NULL, c int, @@ -2841,18 +2840,18 @@ func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) PRIMARY KEY(a,b) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment'; `) - MustExec(t, ctx, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')") - MustExec(t, ctx, conn, "alter table stock add column cct_1 int default 10") - MustExec(t, ctx, conn, "alter table stock modify cct_1 json") - MustExec(t, ctx, conn, "alter table stock add column adc_1 smallint") - defer MustExec(t, ctx, conn, "drop database test_db_state") + MustExec(ctx, t, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')") + MustExec(ctx, t, conn, "alter table stock add column cct_1 int default 10") + MustExec(ctx, t, conn, "alter table stock modify cct_1 json") + MustExec(ctx, t, conn, "alter table stock add column adc_1 smallint") + defer MustExec(ctx, t, conn, "drop database test_db_state") sqls := make([]sqlWithErr, 5) - sqls[0] = sqlWithErr{"begin", nil, nil} - sqls[1] = sqlWithErr{"SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE", nil, nil} - sqls[2] = sqlWithErr{"UPDATE stock SET c = ? WHERE a= ? AND b = 'a'", nil, nil} - sqls[3] = sqlWithErr{"UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'", nil, nil} - sqls[4] = sqlWithErr{"commit", nil, nil} + sqls[0] = sqlWithErr{nil, "begin"} + sqls[1] = sqlWithErr{nil, "SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE"} + sqls[2] = sqlWithErr{nil, "UPDATE stock SET c = ? WHERE a= ? AND b = 'a'"} + sqls[3] = sqlWithErr{nil, "UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'"} + sqls[4] = sqlWithErr{nil, "commit"} dropColumnSQL := "alter table stock drop column cct_1" query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x \n2 b 102 z "}} runTestInSchemaState(t, conn, cli, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query) @@ -2871,7 +2870,7 @@ func runTestInSchemaState( expectQuery *expectQuery, ) { ctx := context.Background() - MustExec(t, ctx, conn, "use test_db_state") + MustExec(ctx, t, conn, "use test_db_state") callback := &callback.TestDDLCallback{Do: dom} prevState := model.StateNone @@ -2885,14 +2884,14 @@ func runTestInSchemaState( err := dbt.GetDB().Close() require.NoError(t, err) }() - MustExec(t, ctx, conn1, "use test_db_state") + MustExec(ctx, t, conn1, "use test_db_state") for i, sqlWithErr := range sqlWithErrs { // Start the test txn. // Step 1: begin(when i = 0). if i == 0 || i == len(sqlWithErrs)-1 { sqlWithErr := sqlWithErrs[i] - MustExec(t, ctx, conn1, sqlWithErr.sql) + MustExec(ctx, t, conn1, sqlWithErr.sql) } else { // Step 2: prepare stmts. // SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE @@ -2907,7 +2906,7 @@ func runTestInSchemaState( // Step 3: begin. sqlWithErr := sqlWithErrs[0] - MustExec(t, ctx, conn1, sqlWithErr.sql) + MustExec(ctx, t, conn1, sqlWithErr.sql) prevState = model.StateNone state = model.StateWriteOnly @@ -2933,7 +2932,7 @@ func runTestInSchemaState( _, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i) require.NoError(t, err) } else { - MustQuery(t, ctx, cli, conn1, sqlWithErr.sql) + MustQuery(ctx, t, cli, conn1, sqlWithErr.sql) } } } @@ -2945,7 +2944,7 @@ func runTestInSchemaState( d := dom.DDL() originalCallback := d.GetHook() d.SetHook(callback) - MustExec(t, ctx, conn, dropColumnSQL) + MustExec(ctx, t, conn, dropColumnSQL) require.NoError(t, checkErr) // Check the result. diff --git a/pkg/server/tests/servertestkit/BUILD.bazel b/pkg/server/tests/servertestkit/BUILD.bazel index 9b2378327973f..65010930b6cfd 100644 --- a/pkg/server/tests/servertestkit/BUILD.bazel +++ b/pkg/server/tests/servertestkit/BUILD.bazel @@ -19,7 +19,9 @@ go_library( "//pkg/util/cpuprofile", "//pkg/util/topsql/collector/mock", "//pkg/util/topsql/state", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", "@io_opencensus_go//stats/view", + "@org_uber_go_zap//:zap", ], ) From 43d716c239a1c4a3f4e37d08cf3f78610d14d9ef Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 26 Jun 2024 12:00:15 +0800 Subject: [PATCH 3/3] *: add comments --- pkg/planner/core/plan_cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index dbd77dce895db..781e83d29e289 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -122,6 +122,11 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep schemaNotMatch = true continue } + // The revision of tbl and newTbl may not be the same. + // Example: + // The version of stmt.tbls[i] is taken from the prepare statement and is revision v1. + // When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1. + // The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update. if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) { schemaNotMatch = true }