Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, session: fix re-upgrade issues (#44469) #44582

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("@bazel_gazelle//:def.bzl", "gazelle")

# gazelle:proto disable_global
Expand All @@ -14,3 +15,18 @@ xcode_version(
name = "xcode_version",
version = "10.0",
)

go_library(
name = "tidb_lib",
srcs = ["tidb_create.go"],
importpath = "github.com/pingcap/tidb",
deps = [
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_ngaut_log//:go_default_library",
],
)

go_binary(
name = "tidb",
embed = [":tidb_lib"],
)
17 changes: 7 additions & 10 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func hasSysDB(job *model.Job) bool {

func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) {
if d.stateSyncer.IsUpgradingState() {
if job.IsPaused() {
return false, nil
}
// We need to turn the 'pausing' job to be 'paused' in ddl worker,
// and stop the reorganization workers
if job.IsPausing() || hasSysDB(job) {
return true, nil
}
if job.IsPaused() {
return false, nil
}
var errs []error
// During binary upgrade, pause all running DDL jobs
errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID})
Expand All @@ -199,7 +199,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
return false, nil
}

if job.IsPausedBySystem() && !hasSysDB(job) {
if job.IsPausedBySystem() {
var errs []error
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
if len(errs) > 0 {
Expand Down Expand Up @@ -545,8 +545,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
}

func job2SchemaNames(job *model.Job) []string {
switch job.Type {
case model.ActionRenameTable:
if job.Type == model.ActionRenameTable {
var oldSchemaID int64
var oldSchemaName model.CIStr
var tableName model.CIStr
Expand All @@ -557,11 +556,9 @@ func job2SchemaNames(job *model.Job) []string {
names = append(names, strings.ToLower(job.SchemaName))
names = append(names, oldSchemaName.O)
return names
case model.ActionRenameTables:
// TODO: Get this action's schema names.
case model.ActionExchangeTablePartition:
// TODO: Get this action's schema names.
}
// TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names.

return []string{job.SchemaName}
}

Expand Down
9 changes: 6 additions & 3 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,17 @@ func upgrade(s Session) {
}

func syncUpgradeState(s Session) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
totalInterval := time.Duration(internalSQLTimeout) * time.Second
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
defer cancelFunc()
dom := domain.GetDomain(s)
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
}

retryTimes := 10
interval := 200 * time.Millisecond
retryTimes := int(totalInterval / interval)
for i := 0; i < retryTimes; i++ {
op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap")
if err == nil && op.String() == owner.OpGetUpgradingState.String() {
Expand All @@ -1166,7 +1167,9 @@ func syncUpgradeState(s Session) {
if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
if i%10 == 0 {
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
time.Sleep(interval)
}

Expand Down
2 changes: 1 addition & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//config",
"//ddl",
Expand Down
80 changes: 80 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,86 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
" PARTITION `p4` VALUES LESS THAN (7096))"))
}

// checkDDLJobExecSucc is used to make sure the DDL operation is successful.
func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) {
sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID)
suc := false
for i := 0; i < 20; i++ {
rows, err := execute(context.Background(), se, sql)
require.NoError(t, err)
require.Len(t, rows, 1)
require.Equal(t, rows[0].GetString(2), "upgrade_tbl")

state := rows[0].GetString(11)
if state == "synced" {
suc = true
break
}
time.Sleep(time.Millisecond * 200)
}
require.True(t, suc)
}

// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue.
// Then we do re-upgrade(This operation will pause all DDL jobs by the system).
func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
// Mock a general and a reorg job in boostrap.
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)

// Add a paused DDL job before upgrade.
session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)")
ch := make(chan struct{})
var jobID int64
hook := &callback.TestDDLCallback{}
hook.OnJobRunAfterExported = func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
se := session.CreateSessionAndSetID(t, store)
session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID))
}
if job.State == model.JobStatePaused && jobID == 0 {
// Mock pause the ddl job by system.
job.AdminOperator = model.AdminCommandBySystem
ch <- struct{}{}
jobID = job.ID
}
}
dom.DDL().SetHook(hook)
go func() {
_, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int")
}()

<-ch
dom.Close()
// Make sure upgrade is successful.
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

checkDDLJobExecSucc(t, seLatestV, jobID)
}

func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
Expand Down
30 changes: 29 additions & 1 deletion session/mock_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) {
TestHook.OnBootstrapAfter(s)
}

// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster).
func mockSimpleUpgradeToVerLatest(s Session, ver int64) {
logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer))
if ver >= mockLatestVer {
return
}
mustExecute(s, "use mysql")
mustExecute(s, `create table if not exists mock_sys_t(
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
);`)
mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1")
mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)")
TestHook.OnBootstrapAfter(s)
}

// TestHook is exported for testing.
var TestHook = TestCallback{}

Expand Down Expand Up @@ -140,13 +155,26 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 {
return ver
}

const (
defaultMockUpgradeToVerLatest = 0
// MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations.
MockSimpleUpgradeToVerLatest = 1
)

// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion.
var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest

func addMockBootstrapVersionForTest(s Session) {
if !*WithMockUpgrade {
return
}

TestHook.OnBootstrapBefore(s)
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest {
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
} else {
bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest)
}
currentBootstrapVersion++
}

Expand Down