Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix a bug that update statement uses point get and update plan with different tblInfo (#54183) #54258

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -119,7 +119,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ go_test(
"//config",
"//ddl",
"//ddl/util",
"//ddl/util/callback",
"//domain",
"//domain/infosync",
"//errno",
Expand Down
197 changes: 197 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
Expand Down Expand Up @@ -2677,3 +2680,197 @@ func TestSeverHealth(t *testing.T) {
}
require.True(t, server.health.Load(), "server should be healthy")
}

func (cli *testServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit {
db, err := sql.Open("mysql", cli.getDSN(overrider))
require.NoError(t, err)

return testkit.NewDBTestKit(t, db)
}

func MustExec(ctx context.Context, t *testing.T, conn *sql.Conn, sql string) {
_, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
}

func MustQueryWithRetry(ctx context.Context, t *testing.T, cli *tidbTestSuite, conn *sql.Conn, stmt string) {
var rs *sql.Rows
var err error
retryCnt := 1
for i := 0; i < 20; i++ {
rs, err = conn.QueryContext(ctx, stmt)
if err == nil {
break
}
if !strings.Contains(err.Error(), "Information schema is changed") {
break
}
retryCnt++
time.Sleep(100 * time.Millisecond)
}
t.Logf("running test case retry count:%v, stmt:%v", retryCnt, stmt)
require.NoError(t, err)
if rs != nil {
cli.Rows(t, rs)
rs.Close()
}
}

type sqlWithErr struct {
stmt *sql.Stmt
sql string
}

type expectQuery struct {
sql string
rows []string
}

func (cli *testServerClient) runTestIssue53634(t *testing.T, ts *tidbTestSuite, dom *domain.Domain) {
cli.runTestsOnNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
}, "MDL", func(dbt *testkit.DBTestKit) {
ctx := context.Background()

conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
MustExec(ctx, t, conn, "create database test_db_state default charset utf8 default collate utf8_bin")
MustExec(ctx, t, conn, "use test_db_state")
MustExec(ctx, t, conn, `CREATE TABLE stock (
a int NOT NULL,
b char(30) NOT NULL,
c int,
d char(64),
PRIMARY KEY(a,b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment';
`)
MustExec(ctx, t, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')")
MustExec(ctx, t, conn, "alter table stock add column cct_1 int default 10")
MustExec(ctx, t, conn, "alter table stock modify cct_1 json")
MustExec(ctx, t, conn, "alter table stock add column adc_1 smallint")
defer MustExec(ctx, t, conn, "drop database test_db_state")

sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{nil, "begin"}
sqls[1] = sqlWithErr{nil, "SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE"}
sqls[2] = sqlWithErr{nil, "UPDATE stock SET c = ? WHERE a= ? AND b = 'a'"}
sqls[3] = sqlWithErr{nil, "UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'"}
sqls[4] = sqlWithErr{nil, "commit"}
dropColumnSQL := "alter table stock drop column cct_1"
query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x <nil>\n2 b 102 z <nil>"}}
runTestInSchemaState(t, conn, ts, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query)
})
}

func runTestInSchemaState(
t *testing.T,
conn *sql.Conn,
cli *tidbTestSuite,
dom *domain.Domain,
state model.SchemaState,
isOnJobUpdated bool,
dropColumnSQL string,
sqlWithErrs []sqlWithErr,
expectQuery *expectQuery,
) {
ctx := context.Background()
MustExec(ctx, t, conn, "use test_db_state")

callback := &callback.TestDDLCallback{Do: dom}
prevState := model.StateNone
var checkErr error
dbt := cli.getNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
})
conn1, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
defer func() {
err := dbt.GetDB().Close()
require.NoError(t, err)
}()
MustExec(ctx, t, conn1, "use test_db_state")

for i, sqlWithErr := range sqlWithErrs {
// Start the test txn.
// Step 1: begin(when i = 0).
if i == 0 || i == len(sqlWithErrs)-1 {
sqlWithErr := sqlWithErrs[i]
MustExec(ctx, t, conn1, sqlWithErr.sql)
} else {
// Step 2: prepare stmts.
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a'
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'
stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql)
require.NoError(t, err)
sqlWithErr.stmt = stmt
sqlWithErrs[i] = sqlWithErr
}
}

// Step 3: begin.
sqlWithErr := sqlWithErrs[0]
MustExec(ctx, t, conn1, sqlWithErr.sql)

prevState = model.StateNone
state = model.StateWriteOnly
cbFunc1 := func(job *model.Job) {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
// Step 4: exec stmts in write-only state (dropping a colum).
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b")
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1)
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2)
// commit.
sqls := sqlWithErrs[1:]
for i, sqlWithErr := range sqls {
if i == 0 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b")
require.NoError(t, err)
} else if i == 1 || i == 2 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i)
require.NoError(t, err)
} else {
MustQueryWithRetry(ctx, t, cli, conn1, sqlWithErr.sql)
}
}
}
if isOnJobUpdated {
callback.OnJobUpdatedExported.Store(&cbFunc1)
} else {
callback.OnJobRunBeforeExported = cbFunc1
}
d := dom.DDL()
originalCallback := d.GetHook()
d.SetHook(callback)
MustExec(ctx, t, conn, dropColumnSQL)
require.NoError(t, checkErr)

// Check the result.
// select * from stock
if expectQuery != nil {
rs, err := conn.QueryContext(ctx, expectQuery.sql)
require.NoError(t, err)
if expectQuery.rows == nil {
require.Nil(t, rs)
} else {
cli.checkRows(t, rs, expectQuery.rows[0])
}
}
d.SetHook(originalCallback)
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

//revive:enable:exported
25 changes: 25 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ func createTidbTestSuite(t *testing.T) *tidbTestSuite {
return createTidbTestSuiteWithCfg(t, cfg)
}

// parseDuration parses lease argument string.
func parseDuration(lease string) (time.Duration, error) {
dur, err := time.ParseDuration(lease)
if err != nil {
dur, err = time.ParseDuration(lease + "s")
}
if err != nil || dur < 0 {
return 0, errors.Errorf("invalid lease duration: %v", lease)
}
return dur, nil
}

func createTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *tidbTestSuite {
ts := &tidbTestSuite{testServerClient: newTestServerClient()}

Expand All @@ -99,6 +111,9 @@ func createTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *tidbTestSuite
ts.store, err = mockstore.NewMockStore()
session.DisableStats4Test()
require.NoError(t, err)
ddlLeaseDuration, err := parseDuration(cfg.Lease)
require.NoError(t, err)
session.SetSchemaLease(ddlLeaseDuration)
ts.domain, err = session.BootstrapSession(ts.store)
require.NoError(t, err)
ts.tidbdrv = NewTiDBDriver(ts.store)
Expand Down Expand Up @@ -3221,6 +3236,16 @@ func TestProxyProtocolWithIpNoFallbackable(t *testing.T) {
db.Close()
}

func TestIssue53634(t *testing.T) {
cfg := newTestConfig()
cfg.Lease = "20s"
cfg.Port = 4123
cfg.Status.StatusPort = 10088
ts := createTidbTestSuiteWithCfg(t, cfg)

ts.runTestIssue53634(t, ts, ts.domain)
}

func TestAuthSocket(t *testing.T) {
defer mockOSUserForAuthSocketTest.Store(nil)

Expand Down