Skip to content

Commit

Permalink
ddl, session: fix re-upgrade issues (#44469) (#44582)
Browse files Browse the repository at this point in the history
close #44158
  • Loading branch information
ti-chi-bot authored Jul 12, 2023
1 parent eb4e931 commit 5af21fd
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 15 deletions.
17 changes: 7 additions & 10 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func hasSysDB(job *model.Job) bool {

func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) {
if d.stateSyncer.IsUpgradingState() {
if job.IsPaused() {
return false, nil
}
// We need to turn the 'pausing' job to be 'paused' in ddl worker,
// and stop the reorganization workers
if job.IsPausing() || hasSysDB(job) {
return true, nil
}
if job.IsPaused() {
return false, nil
}
var errs []error
// During binary upgrade, pause all running DDL jobs
errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID})
Expand All @@ -199,7 +199,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
return false, nil
}

if job.IsPausedBySystem() && !hasSysDB(job) {
if job.IsPausedBySystem() {
var errs []error
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
if len(errs) > 0 {
Expand Down Expand Up @@ -549,8 +549,7 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
}

func job2SchemaNames(job *model.Job) []string {
switch job.Type {
case model.ActionRenameTable:
if job.Type == model.ActionRenameTable {
var oldSchemaID int64
var oldSchemaName model.CIStr
var tableName model.CIStr
Expand All @@ -561,11 +560,9 @@ func job2SchemaNames(job *model.Job) []string {
names = append(names, strings.ToLower(job.SchemaName))
names = append(names, oldSchemaName.O)
return names
case model.ActionRenameTables:
// TODO: Get this action's schema names.
case model.ActionExchangeTablePartition:
// TODO: Get this action's schema names.
}
// TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names.

return []string{job.SchemaName}
}

Expand Down
9 changes: 6 additions & 3 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,17 @@ func upgrade(s Session) {
}

func syncUpgradeState(s Session) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
totalInterval := time.Duration(internalSQLTimeout) * time.Second
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
defer cancelFunc()
dom := domain.GetDomain(s)
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
}

retryTimes := 10
interval := 200 * time.Millisecond
retryTimes := int(totalInterval / interval)
for i := 0; i < retryTimes; i++ {
op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap")
if err == nil && op.String() == owner.OpGetUpgradingState.String() {
Expand All @@ -1166,7 +1167,9 @@ func syncUpgradeState(s Session) {
if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
if i%10 == 0 {
logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
}
time.Sleep(interval)
}

Expand Down
2 changes: 1 addition & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
"//config",
"//ddl",
Expand Down
83 changes: 83 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.Equal(t, session.CurrentBootstrapVersion, ver)

// Resume the DDL job, then add index operation can be executed successfully.
session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID))
checkDDLJobExecSucc(t, seLatestV, jobID)

session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID))
sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID)
// Make sure the add index operation is successful.
Expand All @@ -362,6 +365,86 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.True(t, suc)
}

// checkDDLJobExecSucc is used to make sure the DDL operation is successful.
func checkDDLJobExecSucc(t *testing.T, se session.Session, jobID int64) {
sql := fmt.Sprintf(" admin show ddl jobs where job_id=%d", jobID)
suc := false
for i := 0; i < 20; i++ {
rows, err := execute(context.Background(), se, sql)
require.NoError(t, err)
require.Len(t, rows, 1)
require.Equal(t, rows[0].GetString(2), "upgrade_tbl")

state := rows[0].GetString(11)
if state == "synced" {
suc = true
break
}
time.Sleep(time.Millisecond * 200)
}
require.True(t, suc)
}

// TestUpgradeVersionForSystemPausedJob tests mock the first upgrade failed, and it has a mock system DDL in queue.
// Then we do re-upgrade(This operation will pause all DDL jobs by the system).
func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
// Mock a general and a reorg job in boostrap.
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)

// Add a paused DDL job before upgrade.
session.MustExec(t, seV, "create table mysql.upgrade_tbl(a int)")
ch := make(chan struct{})
var jobID int64
hook := &callback.TestDDLCallback{}
hook.OnJobRunAfterExported = func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
se := session.CreateSessionAndSetID(t, store)
session.MustExec(t, se, fmt.Sprintf("admin pause ddl jobs %d", job.ID))
}
if job.State == model.JobStatePaused && jobID == 0 {
// Mock pause the ddl job by system.
job.AdminOperator = model.AdminCommandBySystem
ch <- struct{}{}
jobID = job.ID
}
}
dom.DDL().SetHook(hook)
go func() {
_, err = execute(context.Background(), seV, "alter table mysql.upgrade_tbl add column b int")
}()

<-ch
dom.Close()
// Make sure upgrade is successful.
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

checkDDLJobExecSucc(t, seLatestV, jobID)
}

func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
Expand Down
30 changes: 29 additions & 1 deletion session/mock_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ func mockUpgradeToVerLatest(s Session, ver int64) {
TestHook.OnBootstrapAfter(s)
}

// mockSimpleUpgradeToVerLatest mocks a simple bootstrapVersion(make the test faster).
func mockSimpleUpgradeToVerLatest(s Session, ver int64) {
logutil.BgLogger().Info("mock upgrade to ver latest", zap.Int64("old ver", ver), zap.Int64("mock latest ver", mockLatestVer))
if ver >= mockLatestVer {
return
}
mustExecute(s, "use mysql")
mustExecute(s, `create table if not exists mock_sys_t(
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
);`)
mustExecute(s, "alter table mock_sys_t add column mayNullCol bigint default 1")
mustExecute(s, "alter table mock_sys_t add index idx_c2(c2)")
TestHook.OnBootstrapAfter(s)
}

// TestHook is exported for testing.
var TestHook = TestCallback{}

Expand Down Expand Up @@ -140,13 +155,26 @@ func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 {
return ver
}

const (
defaultMockUpgradeToVerLatest = 0
// MockSimpleUpgradeToVerLatest is used to indicate the use of the simple mock bootstrapVersion, this is just a few simple DDL operations.
MockSimpleUpgradeToVerLatest = 1
)

// MockUpgradeToVerLatestKind is used to indicate the use of different mock bootstrapVersion.
var MockUpgradeToVerLatestKind = defaultMockUpgradeToVerLatest

func addMockBootstrapVersionForTest(s Session) {
if !*WithMockUpgrade {
return
}

TestHook.OnBootstrapBefore(s)
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
if MockUpgradeToVerLatestKind == defaultMockUpgradeToVerLatest {
bootstrapVersion = append(bootstrapVersion, mockUpgradeToVerLatest)
} else {
bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest)
}
currentBootstrapVersion++
}

Expand Down

0 comments on commit 5af21fd

Please sign in to comment.