Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix a bug that update statement uses point get and update plan with different tblInfo (#54183) #54255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -116,7 +116,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/server/internal/testserverclient",
visibility = ["//pkg/server:__subpackages__"],
deps = [
"//pkg/ddl/util/callback",
"//pkg/domain",
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand Down
181 changes: 181 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -2788,4 +2791,182 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) {
})
}

func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit {
db, err := sql.Open("mysql", cli.GetDSN(overrider))
require.NoError(t, err)

return testkit.NewDBTestKit(t, db)
}

func MustExec(ctx context.Context, t *testing.T, conn *sql.Conn, sql string) {
_, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
}

func MustQuery(ctx context.Context, t *testing.T, cli *TestServerClient, conn *sql.Conn, sql string) {
rs, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
if rs != nil {
cli.Rows(t, rs)
rs.Close()
}
}

type sqlWithErr struct {
stmt *sql.Stmt
sql string
}

type expectQuery struct {
sql string
rows []string
}

func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) {
cli.RunTests(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
}, func(dbt *testkit.DBTestKit) {
ctx := context.Background()

conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
MustExec(ctx, t, conn, "create database test_db_state default charset utf8 default collate utf8_bin")
MustExec(ctx, t, conn, "use test_db_state")
MustExec(ctx, t, conn, `CREATE TABLE stock (
a int NOT NULL,
b char(30) NOT NULL,
c int,
d char(64),
PRIMARY KEY(a,b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment';
`)
MustExec(ctx, t, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')")
MustExec(ctx, t, conn, "alter table stock add column cct_1 int default 10")
MustExec(ctx, t, conn, "alter table stock modify cct_1 json")
MustExec(ctx, t, conn, "alter table stock add column adc_1 smallint")
defer MustExec(ctx, t, conn, "drop database test_db_state")

sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{nil, "begin"}
sqls[1] = sqlWithErr{nil, "SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE"}
sqls[2] = sqlWithErr{nil, "UPDATE stock SET c = ? WHERE a= ? AND b = 'a'"}
sqls[3] = sqlWithErr{nil, "UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'"}
sqls[4] = sqlWithErr{nil, "commit"}
dropColumnSQL := "alter table stock drop column cct_1"
query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x <nil>\n2 b 102 z <nil>"}}
runTestInSchemaState(t, conn, cli, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query)
})
}

func runTestInSchemaState(
t *testing.T,
conn *sql.Conn,
cli *TestServerClient,
dom *domain.Domain,
state model.SchemaState,
isOnJobUpdated bool,
dropColumnSQL string,
sqlWithErrs []sqlWithErr,
expectQuery *expectQuery,
) {
ctx := context.Background()
MustExec(ctx, t, conn, "use test_db_state")

callback := &callback.TestDDLCallback{Do: dom}
prevState := model.StateNone
var checkErr error
dbt := cli.getNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
})
conn1, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
defer func() {
err := dbt.GetDB().Close()
require.NoError(t, err)
}()
MustExec(ctx, t, conn1, "use test_db_state")

for i, sqlWithErr := range sqlWithErrs {
// Start the test txn.
// Step 1: begin(when i = 0).
if i == 0 || i == len(sqlWithErrs)-1 {
sqlWithErr := sqlWithErrs[i]
MustExec(ctx, t, conn1, sqlWithErr.sql)
} else {
// Step 2: prepare stmts.
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a'
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'
stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql)
require.NoError(t, err)
sqlWithErr.stmt = stmt
sqlWithErrs[i] = sqlWithErr
}
}

// Step 3: begin.
sqlWithErr := sqlWithErrs[0]
MustExec(ctx, t, conn1, sqlWithErr.sql)

prevState = model.StateNone
state = model.StateWriteOnly
cbFunc1 := func(job *model.Job) {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
// Step 4: exec stmts in write-only state (dropping a colum).
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b")
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1)
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2)
// commit.
sqls := sqlWithErrs[1:]
for i, sqlWithErr := range sqls {
if i == 0 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b")
require.NoError(t, err)
} else if i == 1 || i == 2 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i)
require.NoError(t, err)
} else {
MustQuery(ctx, t, cli, conn1, sqlWithErr.sql)
}
}
}
if isOnJobUpdated {
callback.OnJobUpdatedExported.Store(&cbFunc1)
} else {
callback.OnJobRunBeforeExported = cbFunc1
}
d := dom.DDL()
originalCallback := d.GetHook()
d.SetHook(callback)
MustExec(ctx, t, conn, dropColumnSQL)
require.NoError(t, checkErr)

// Check the result.
// select * from stock
if expectQuery != nil {
rs, err := conn.QueryContext(ctx, expectQuery.sql)
require.NoError(t, err)
if expectQuery.rows == nil {
require.Nil(t, rs)
} else {
cli.CheckRows(t, rs, expectQuery.rows[0])
}
}
d.SetHook(originalCallback)
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

//revive:enable:exported
5 changes: 5 additions & 0 deletions pkg/server/tests/commontest/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3089,6 +3089,11 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) {
ts.RunTestTypeAndCharsetOfSendLongData(t)
}

func TestIssue53634(t *testing.T) {
ts := servertestkit.CreateTidbTestSuiteWithDDLLease(t, "20s")
ts.RunTestIssue53634(t, ts.Domain)
}

func TestAuthSocket(t *testing.T) {
defer server2.ClearOSUserForAuthSocket()

Expand Down
2 changes: 2 additions & 0 deletions pkg/server/tests/servertestkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ go_library(
"//pkg/util/cpuprofile",
"//pkg/util/topsql/collector/mock",
"//pkg/util/topsql/state",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@io_opencensus_go//stats/view",
"@org_uber_go_zap//:zap",
],
)
32 changes: 31 additions & 1 deletion pkg/server/tests/servertestkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"database/sql"
"sync"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
Expand All @@ -35,6 +37,7 @@ import (
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
)

// TidbTestSuite is a test suite for tidb
Expand All @@ -48,13 +51,37 @@ type TidbTestSuite struct {

// CreateTidbTestSuite creates a test suite for tidb
func CreateTidbTestSuite(t *testing.T) *TidbTestSuite {
cfg := newTestConfig()
return CreateTidbTestSuiteWithCfg(t, cfg)
}

// CreateTidbTestSuiteWithDDLLease creates a test suite with DDL lease for tidb.
func CreateTidbTestSuiteWithDDLLease(t *testing.T, ddlLease string) *TidbTestSuite {
cfg := newTestConfig()
cfg.Lease = ddlLease
return CreateTidbTestSuiteWithCfg(t, cfg)
}

func newTestConfig() *config.Config {
cfg := util.NewTestConfig()
cfg.Port = 0
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = 0
cfg.Status.RecordDBLabel = true
cfg.Performance.TCPKeepAlive = true
return CreateTidbTestSuiteWithCfg(t, cfg)
return cfg
}

// parseDuration parses lease argument string.
func parseDuration(lease string) (time.Duration, error) {
dur, err := time.ParseDuration(lease)
if err != nil {
dur, err = time.ParseDuration(lease + "s")
}
if err != nil || dur < 0 {
return 0, errors.Newf("invalid lease duration", zap.String("lease", lease))
}
return dur, nil
}

// CreateTidbTestSuiteWithCfg creates a test suite for tidb with config
Expand All @@ -66,6 +93,9 @@ func CreateTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *TidbTestSuite
ts.Store, err = mockstore.NewMockStore()
session.DisableStats4Test()
require.NoError(t, err)
ddlLeaseDuration, err := parseDuration(cfg.Lease)
require.NoError(t, err)
session.SetSchemaLease(ddlLeaseDuration)
ts.Domain, err = session.BootstrapSession(ts.Store)
require.NoError(t, err)
ts.Tidbdrv = srv.NewTiDBDriver(ts.Store)
Expand Down