diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index 436d6d6fcbc1b..19c19dc7c49f0 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -101,7 +101,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep // step 3: add metadata lock and check each table's schema version schemaNotMatch := false for i := 0; i < len(stmt.dbName); i++ { - _, ok := is.TableByID(stmt.tbls[i].Meta().ID) + tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID) if !ok { tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name) if err != nil { @@ -116,7 +116,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep schemaNotMatch = true continue } - if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision { + if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || tbl.Meta().Revision != newTbl.Meta().Revision { schemaNotMatch = true } stmt.tbls[i] = newTbl diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index fa6d42f42935e..31d396c7eef2e 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -37,9 +37,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/ddl/util/callback" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/testkit" @@ -2788,4 +2791,183 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) { }) } +func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit { + db, err := sql.Open("mysql", cli.GetDSN(overrider)) + require.NoError(t, err) + + return testkit.NewDBTestKit(t, db) +} + +func MustExec(t *testing.T, ctx context.Context, conn *sql.Conn, sql string) { + _, err := conn.QueryContext(ctx, sql) + require.NoError(t, err) +} + +func MustQuery(t *testing.T, ctx context.Context, cli *TestServerClient, conn *sql.Conn, sql string) { + rs, err := conn.QueryContext(ctx, sql) + require.NoError(t, err) + if rs != nil { + cli.Rows(t, rs) + rs.Close() + } +} + +type sqlWithErr struct { + sql string + expectErr error + stmt *sql.Stmt +} + +type expectQuery struct { + sql string + rows []string +} + +func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) { + cli.RunTests(t, func(config *mysql.Config) { + config.MaxAllowedPacket = 1024 + }, func(dbt *testkit.DBTestKit) { + ctx := context.Background() + + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + MustExec(t, ctx, conn, "create database test_db_state default charset utf8 default collate utf8_bin") + MustExec(t, ctx, conn, "use test_db_state") + MustExec(t, ctx, conn, `CREATE TABLE stock ( + a int NOT NULL, + b char(30) NOT NULL, + c int, + d char(64), + PRIMARY KEY(a,b) +) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment'; +`) + MustExec(t, ctx, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')") + MustExec(t, ctx, conn, "alter table stock add column cct_1 int default 10") + MustExec(t, ctx, conn, "alter table stock modify cct_1 json") + MustExec(t, ctx, conn, "alter table stock add column adc_1 smallint") + defer MustExec(t, ctx, conn, "drop database test_db_state") + + sqls := make([]sqlWithErr, 5) + sqls[0] = sqlWithErr{"begin", nil, nil} + sqls[1] = sqlWithErr{"SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE", nil, nil} + sqls[2] = sqlWithErr{"UPDATE stock SET c = ? WHERE a= ? AND b = 'a'", nil, nil} + sqls[3] = sqlWithErr{"UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'", nil, nil} + sqls[4] = sqlWithErr{"commit", nil, nil} + dropColumnSQL := "alter table stock drop column cct_1" + query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x \n2 b 102 z "}} + runTestInSchemaState(t, conn, cli, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query) + }) +} + +func runTestInSchemaState( + t *testing.T, + conn *sql.Conn, + cli *TestServerClient, + dom *domain.Domain, + state model.SchemaState, + isOnJobUpdated bool, + dropColumnSQL string, + sqlWithErrs []sqlWithErr, + expectQuery *expectQuery, +) { + ctx := context.Background() + MustExec(t, ctx, conn, "use test_db_state") + + callback := &callback.TestDDLCallback{Do: dom} + prevState := model.StateNone + var checkErr error + dbt := cli.getNewDB(t, func(config *mysql.Config) { + config.MaxAllowedPacket = 1024 + }) + conn1, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + defer func() { + err := dbt.GetDB().Close() + require.NoError(t, err) + }() + MustExec(t, ctx, conn1, "use test_db_state") + + for i, sqlWithErr := range sqlWithErrs { + // Start the test txn. + // Step 1: begin(when i = 0). + if i == 0 || i == len(sqlWithErrs)-1 { + sqlWithErr := sqlWithErrs[i] + MustExec(t, ctx, conn1, sqlWithErr.sql) + } else { + // Step 2: prepare stmts. + // SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE + // UPDATE stock SET c = ? WHERE a= ? AND b = 'a' + // UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b' + stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql) + require.NoError(t, err) + sqlWithErr.stmt = stmt + sqlWithErrs[i] = sqlWithErr + } + } + + // Step 3: begin. + sqlWithErr := sqlWithErrs[0] + MustExec(t, ctx, conn1, sqlWithErr.sql) + + prevState = model.StateNone + state = model.StateWriteOnly + cbFunc1 := func(job *model.Job) { + if jobStateOrLastSubJobState(job) == prevState || checkErr != nil { + return + } + prevState = jobStateOrLastSubJobState(job) + if prevState != state { + return + } + // Step 4: exec stmts in write-only state (dropping a colum). + // SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b") + // UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1) + // UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2) + // commit. + sqls := sqlWithErrs[1:] + for i, sqlWithErr := range sqls { + if i == 0 { + _, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b") + require.NoError(t, err) + } else if i == 1 || i == 2 { + _, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i) + require.NoError(t, err) + } else { + MustQuery(t, ctx, cli, conn1, sqlWithErr.sql) + } + } + } + if isOnJobUpdated { + callback.OnJobUpdatedExported.Store(&cbFunc1) + } else { + callback.OnJobRunBeforeExported = cbFunc1 + } + d := dom.DDL() + originalCallback := d.GetHook() + d.SetHook(callback) + MustExec(t, ctx, conn, dropColumnSQL) + require.NoError(t, checkErr) + + // Check the result. + // select * from stock + if expectQuery != nil { + rs, err := conn.QueryContext(ctx, expectQuery.sql) + require.NoError(t, err) + if expectQuery.rows == nil { + require.Nil(t, rs) + } else { + cli.CheckRows(t, rs, expectQuery.rows[0]) + } + } + d.SetHook(originalCallback) +} + +func jobStateOrLastSubJobState(job *model.Job) model.SchemaState { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + subs := job.MultiSchemaInfo.SubJobs + return subs[len(subs)-1].SchemaState + } + return job.SchemaState +} + //revive:enable:exported diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index 9482a51b6cd14..d9c9b6ce4fdd8 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3089,6 +3089,11 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) { ts.RunTestTypeAndCharsetOfSendLongData(t) } +func TestIssue53634(t *testing.T) { + ts := servertestkit.CreateTidbTestSuiteWithDDLLease(t, "20s") + ts.RunTestIssue53634(t, ts.Domain) +} + func TestAuthSocket(t *testing.T) { defer server2.ClearOSUserForAuthSocket() diff --git a/pkg/server/tests/servertestkit/testkit.go b/pkg/server/tests/servertestkit/testkit.go index d1efdf4cd37f3..348ac91a3bee4 100644 --- a/pkg/server/tests/servertestkit/testkit.go +++ b/pkg/server/tests/servertestkit/testkit.go @@ -19,7 +19,9 @@ import ( "database/sql" "sync" "testing" + "time" + "github.com/cockroachdb/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" @@ -35,6 +37,7 @@ import ( topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" "github.com/stretchr/testify/require" "go.opencensus.io/stats/view" + "go.uber.org/zap" ) // TidbTestSuite is a test suite for tidb @@ -48,13 +51,37 @@ type TidbTestSuite struct { // CreateTidbTestSuite creates a test suite for tidb func CreateTidbTestSuite(t *testing.T) *TidbTestSuite { + cfg := newTestConfig() + return CreateTidbTestSuiteWithCfg(t, cfg) +} + +// CreateTidbTestSuiteWithDDLLease creates a test suite with DDL lease for tidb. +func CreateTidbTestSuiteWithDDLLease(t *testing.T, ddlLease string) *TidbTestSuite { + cfg := newTestConfig() + cfg.Lease = ddlLease + return CreateTidbTestSuiteWithCfg(t, cfg) +} + +func newTestConfig() *config.Config { cfg := util.NewTestConfig() cfg.Port = 0 cfg.Status.ReportStatus = true cfg.Status.StatusPort = 0 cfg.Status.RecordDBLabel = true cfg.Performance.TCPKeepAlive = true - return CreateTidbTestSuiteWithCfg(t, cfg) + return cfg +} + +// parseDuration parses lease argument string. +func parseDuration(lease string) (time.Duration, error) { + dur, err := time.ParseDuration(lease) + if err != nil { + dur, err = time.ParseDuration(lease + "s") + } + if err != nil || dur < 0 { + return 0, errors.Newf("invalid lease duration", zap.String("lease", lease)) + } + return dur, nil } // CreateTidbTestSuiteWithCfg creates a test suite for tidb with config @@ -66,6 +93,9 @@ func CreateTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *TidbTestSuite ts.Store, err = mockstore.NewMockStore() session.DisableStats4Test() require.NoError(t, err) + ddlLeaseDuration, err := parseDuration(cfg.Lease) + require.NoError(t, err) + session.SetSchemaLease(ddlLeaseDuration) ts.Domain, err = session.BootstrapSession(ts.Store) require.NoError(t, err) ts.Tidbdrv = srv.NewTiDBDriver(ts.Store)