diff --git a/ddl/job_table.go b/ddl/job_table.go index 12f52b1bc06cc..1f6ca6286057b 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -170,14 +170,14 @@ func hasSysDB(job *model.Job) bool { func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { if d.stateSyncer.IsUpgradingState() { + if job.IsPaused() { + return false, nil + } // We need to turn the 'pausing' job to be 'paused' in ddl worker, // and stop the reorganization workers if job.IsPausing() || hasSysDB(job) { return true, nil } - if job.IsPaused() { - return false, nil - } var errs []error // During binary upgrade, pause all running DDL jobs errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID}) @@ -199,7 +199,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun return false, nil } - if job.IsPausedBySystem() && !hasSysDB(job) { + if job.IsPausedBySystem() { var errs []error errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID}) if len(errs) > 0 { @@ -549,8 +549,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string { } func job2SchemaNames(job *model.Job) []string { - switch job.Type { - case model.ActionRenameTable: + if job.Type == model.ActionRenameTable { var oldSchemaID int64 var oldSchemaName model.CIStr var tableName model.CIStr @@ -561,11 +560,9 @@ func job2SchemaNames(job *model.Job) []string { names = append(names, strings.ToLower(job.SchemaName)) names = append(names, oldSchemaName.O) return names - case model.ActionRenameTables: - // TODO: Get this action's schema names. - case model.ActionExchangeTablePartition: - // TODO: Get this action's schema names. } + // TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names. + return []string{job.SchemaName} } diff --git a/session/bootstrap.go b/session/bootstrap.go index 3aed045e15513..4a8be216541dd 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1148,7 +1148,8 @@ func upgrade(s Session) { } func syncUpgradeState(s Session) { - ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + totalInterval := time.Duration(internalSQLTimeout) * time.Second + ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval) defer cancelFunc() dom := domain.GetDomain(s) err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) @@ -1156,8 +1157,8 @@ func syncUpgradeState(s Session) { logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) } - retryTimes := 10 interval := 200 * time.Millisecond + retryTimes := int(totalInterval / interval) for i := 0; i < retryTimes; i++ { op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") if err == nil && op.String() == owner.OpGetUpgradingState.String() { @@ -1166,7 +1167,9 @@ func syncUpgradeState(s Session) { if i == retryTimes-1 { logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) } - logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + if i%10 == 0 { + logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + } time.Sleep(interval) } diff --git a/session/bootstraptest/BUILD.bazel b/session/bootstraptest/BUILD.bazel index 92b175ed1e4a9..cedd8d3d1d49a 100644 --- a/session/bootstraptest/BUILD.bazel +++ b/session/bootstraptest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 8, + shard_count = 9, deps = [ "//config", "//ddl", diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 60a8954e2f9c8..13bc91035496d 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -342,6 +342,9 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.Equal(t, session.CurrentBootstrapVersion, ver) // Resume the DDL job, then add index operation can be executed successfully. + session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID)) + checkDDLJobExecSucc(t, seLatestV, jobID) + session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID)) sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) // Make sure the add index operation is successful. @@ -362,6 +365,86 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.True(t, suc) } +// checkDDLJobExecSucc is used to make sure the DDL operation is successful. +func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) { + sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID) + suc := false + for i := 0; i < 20; i++ { + rows, err := execute(context.Background(), se, sql) + require.NoError(t, err) + require.Len(t, rows, 1) + require.Equal(t, rows[0].GetString(2), "upgrade_tbl") + + state := rows[0].GetString(11) + if state == "synced" { + suc = true + break + } + time.Sleep(time.Millisecond * 200) + } + require.True(t, suc) +} + +// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue. +// Then we do re-upgrade(This operation will pause all DDL jobs by the system). +func TestUpgradeVersionForSystemPausedJob(t *testing.T) { + // Mock a general and a reorg job in boostrap. + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion-1, ver) + + // Add a paused DDL job before upgrade. + session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)") + ch := make(chan struct{}) + var jobID int64 + hook := &callback.TestDDLCallback{} + hook.OnJobRunAfterExported = func(job *model.Job) { + if job.SchemaState == model.StateDeleteOnly { + se := session.CreateSessionAndSetID(t, store) + session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID)) + } + if job.State == model.JobStatePaused && jobID == 0 { + // Mock pause the ddl job by system. + job.AdminOperator = model.AdminCommandBySystem + ch <- struct{}{} + jobID = job.ID + } + } + dom.DDL().SetHook(hook) + go func() { + _, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int") + }() + + <-ch + dom.Close() + // Make sure upgrade is successful. + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + + checkDDLJobExecSucc(t, seLatestV, jobID) +} + func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query) diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index df7f4f582dd1f..cfad1f8fa24e4 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) { TestHook.OnBootstrapAfter(s) } +// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster). +func mockSimpleUpgradeToVerLatest(s Session, ver int64) { + logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer)) + if ver >= mockLatestVer { + return + } + mustExecute(s, "use mysql") + mustExecute(s, `create table if not exists mock_sys_t( + c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1) + );`) + mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1") + mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)") + TestHook.OnBootstrapAfter(s) +} + // TestHook is exported for testing. var TestHook = TestCallback{} @@ -140,13 +155,26 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { return ver } +const ( + defaultMockUpgradeToVerLatest = 0 + // MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations. + MockSimpleUpgradeToVerLatest = 1 +) + +// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion. +var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest + func addMockBootstrapVersionForTest(s Session) { if !*WithMockUpgrade { return } TestHook.OnBootstrapBefore(s) - bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest { + bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest) + } else { + bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest) + } currentBootstrapVersion++ }