Skip to content

Commit

Permalink
*: fix a bug that update statement uses point get and update plan wit…
Browse files Browse the repository at this point in the history
…h different tblInfo (#54183) (#54258)

close #53634
  • Loading branch information
ti-chi-bot authored Nov 12, 2024
1 parent cc699b6 commit fa9805b
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 2 deletions.
9 changes: 7 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -119,7 +119,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ go_test(
"//config",
"//ddl",
"//ddl/util",
"//ddl/util/callback",
"//domain",
"//domain/infosync",
"//errno",
Expand Down
197 changes: 197 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
Expand Down Expand Up @@ -2677,3 +2680,197 @@ func TestSeverHealth(t *testing.T) {
}
require.True(t, server.health.Load(), "server should be healthy")
}

func (cli *testServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit {
db, err := sql.Open("mysql", cli.getDSN(overrider))
require.NoError(t, err)

return testkit.NewDBTestKit(t, db)
}

func MustExec(ctx context.Context, t *testing.T, conn *sql.Conn, sql string) {
_, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
}

func MustQueryWithRetry(ctx context.Context, t *testing.T, cli *tidbTestSuite, conn *sql.Conn, stmt string) {
var rs *sql.Rows
var err error
retryCnt := 1
for i := 0; i < 20; i++ {
rs, err = conn.QueryContext(ctx, stmt)
if err == nil {
break
}
if !strings.Contains(err.Error(), "Information schema is changed") {
break
}
retryCnt++
time.Sleep(100 * time.Millisecond)
}
t.Logf("running test case retry count:%v, stmt:%v", retryCnt, stmt)
require.NoError(t, err)
if rs != nil {
cli.Rows(t, rs)
rs.Close()
}
}

type sqlWithErr struct {
stmt *sql.Stmt
sql string
}

type expectQuery struct {
sql string
rows []string
}

func (cli *testServerClient) runTestIssue53634(t *testing.T, ts *tidbTestSuite, dom *domain.Domain) {
cli.runTestsOnNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
}, "MDL", func(dbt *testkit.DBTestKit) {
ctx := context.Background()

conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
MustExec(ctx, t, conn, "create database test_db_state default charset utf8 default collate utf8_bin")
MustExec(ctx, t, conn, "use test_db_state")
MustExec(ctx, t, conn, `CREATE TABLE stock (
a int NOT NULL,
b char(30) NOT NULL,
c int,
d char(64),
PRIMARY KEY(a,b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment';
`)
MustExec(ctx, t, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')")
MustExec(ctx, t, conn, "alter table stock add column cct_1 int default 10")
MustExec(ctx, t, conn, "alter table stock modify cct_1 json")
MustExec(ctx, t, conn, "alter table stock add column adc_1 smallint")
defer MustExec(ctx, t, conn, "drop database test_db_state")

sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{nil, "begin"}
sqls[1] = sqlWithErr{nil, "SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE"}
sqls[2] = sqlWithErr{nil, "UPDATE stock SET c = ? WHERE a= ? AND b = 'a'"}
sqls[3] = sqlWithErr{nil, "UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'"}
sqls[4] = sqlWithErr{nil, "commit"}
dropColumnSQL := "alter table stock drop column cct_1"
query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x <nil>\n2 b 102 z <nil>"}}
runTestInSchemaState(t, conn, ts, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query)
})
}

func runTestInSchemaState(
t *testing.T,
conn *sql.Conn,
cli *tidbTestSuite,
dom *domain.Domain,
state model.SchemaState,
isOnJobUpdated bool,
dropColumnSQL string,
sqlWithErrs []sqlWithErr,
expectQuery *expectQuery,
) {
ctx := context.Background()
MustExec(ctx, t, conn, "use test_db_state")

callback := &callback.TestDDLCallback{Do: dom}
prevState := model.StateNone
var checkErr error
dbt := cli.getNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
})
conn1, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
defer func() {
err := dbt.GetDB().Close()
require.NoError(t, err)
}()
MustExec(ctx, t, conn1, "use test_db_state")

for i, sqlWithErr := range sqlWithErrs {
// Start the test txn.
// Step 1: begin(when i = 0).
if i == 0 || i == len(sqlWithErrs)-1 {
sqlWithErr := sqlWithErrs[i]
MustExec(ctx, t, conn1, sqlWithErr.sql)
} else {
// Step 2: prepare stmts.
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a'
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'
stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql)
require.NoError(t, err)
sqlWithErr.stmt = stmt
sqlWithErrs[i] = sqlWithErr
}
}

// Step 3: begin.
sqlWithErr := sqlWithErrs[0]
MustExec(ctx, t, conn1, sqlWithErr.sql)

prevState = model.StateNone
state = model.StateWriteOnly
cbFunc1 := func(job *model.Job) {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
// Step 4: exec stmts in write-only state (dropping a colum).
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b")
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1)
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2)
// commit.
sqls := sqlWithErrs[1:]
for i, sqlWithErr := range sqls {
if i == 0 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b")
require.NoError(t, err)
} else if i == 1 || i == 2 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i)
require.NoError(t, err)
} else {
MustQueryWithRetry(ctx, t, cli, conn1, sqlWithErr.sql)
}
}
}
if isOnJobUpdated {
callback.OnJobUpdatedExported.Store(&cbFunc1)
} else {
callback.OnJobRunBeforeExported = cbFunc1
}
d := dom.DDL()
originalCallback := d.GetHook()
d.SetHook(callback)
MustExec(ctx, t, conn, dropColumnSQL)
require.NoError(t, checkErr)

// Check the result.
// select * from stock
if expectQuery != nil {
rs, err := conn.QueryContext(ctx, expectQuery.sql)
require.NoError(t, err)
if expectQuery.rows == nil {
require.Nil(t, rs)
} else {
cli.checkRows(t, rs, expectQuery.rows[0])
}
}
d.SetHook(originalCallback)
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

//revive:enable:exported
25 changes: 25 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ func createTidbTestSuite(t *testing.T) *tidbTestSuite {
return createTidbTestSuiteWithCfg(t, cfg)
}

// parseDuration parses lease argument string.
func parseDuration(lease string) (time.Duration, error) {
dur, err := time.ParseDuration(lease)
if err != nil {
dur, err = time.ParseDuration(lease + "s")
}
if err != nil || dur < 0 {
return 0, errors.Errorf("invalid lease duration: %v", lease)
}
return dur, nil
}

func createTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *tidbTestSuite {
ts := &tidbTestSuite{testServerClient: newTestServerClient()}

Expand All @@ -99,6 +111,9 @@ func createTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *tidbTestSuite
ts.store, err = mockstore.NewMockStore()
session.DisableStats4Test()
require.NoError(t, err)
ddlLeaseDuration, err := parseDuration(cfg.Lease)
require.NoError(t, err)
session.SetSchemaLease(ddlLeaseDuration)
ts.domain, err = session.BootstrapSession(ts.store)
require.NoError(t, err)
ts.tidbdrv = NewTiDBDriver(ts.store)
Expand Down Expand Up @@ -3221,6 +3236,16 @@ func TestProxyProtocolWithIpNoFallbackable(t *testing.T) {
db.Close()
}

func TestIssue53634(t *testing.T) {
cfg := newTestConfig()
cfg.Lease = "20s"
cfg.Port = 4123
cfg.Status.StatusPort = 10088
ts := createTidbTestSuiteWithCfg(t, cfg)

ts.runTestIssue53634(t, ts, ts.domain)
}

func TestAuthSocket(t *testing.T) {
defer mockOSUserForAuthSocketTest.Store(nil)

Expand Down

0 comments on commit fa9805b

Please sign in to comment.