Skip to content

Commit

Permalink
session: remove pause all jobs when upgrading (#46234)
Browse files Browse the repository at this point in the history
close #46227, close #46228
  • Loading branch information
zimulala authored Aug 21, 2023
1 parent ab4d6ad commit 8c1d2c8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 35 deletions.
25 changes: 0 additions & 25 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,31 +1282,6 @@ func SyncUpgradeState(s Session) error {
time.Sleep(interval)
}

retryTimes = 60
interval = 500 * time.Millisecond
for i := 0; i < retryTimes; i++ {
jobErrs, err := ddl.PauseAllJobsBySystem(s)
if err == nil && len(jobErrs) == 0 {
break
}
jobErrStrs := make([]string, 0, len(jobErrs))
for _, jobErr := range jobErrs {
if dbterror.ErrPausedDDLJob.Equal(jobErr) {
continue
}
jobErrStrs = append(jobErrStrs, jobErr.Error())
}
if err == nil && len(jobErrStrs) == 0 {
break
}

if i == retryTimes-1 {
logutil.BgLogger().Error("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err))
return err
}
logutil.BgLogger().Warn("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("update global state to upgrading", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading))
return nil
}
Expand Down
37 changes: 27 additions & 10 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R
return rows, nil
}

// TestUpgradeWithPauseDDL adds a user and a system DB's DDL operations, before every test bootstrap(DDL operation). It tests:
//
// 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused.
// 2.Check user DDLs are handled after system DDLs.
func TestUpgradeWithPauseDDL(t *testing.T) {
session.SupportUpgradeStateVer--
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
Expand All @@ -564,6 +568,12 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table test.pause_user_ddl_t(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table mysql.pause_user_ddl_t(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table test.pause_user_ddl_t1(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table mysql.pause_user_ddl_t1(a int, b int)")
require.NoError(t, err)

tc := session.TestCallback{Cnt: atomicutil.NewInt32(0)}
sql := "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) order by processing desc, job_id"
Expand All @@ -574,16 +584,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
}

wg := sync.WaitGroup{}
tc.OnBootstrapExported = func(s session.Session) {
var query string
switch tc.Cnt.Load() % 2 {
case 0:
query = fmt.Sprintf("alter table test.pause_user_ddl_t add index idx_%d(a)", tc.Cnt.Load())
case 1:
query = fmt.Sprintf("alter table test.pause_user_ddl_t add column c_%d int", tc.Cnt.Load())
}
tc.Cnt.Add(1)

asyncExecDDL := func(query string) {
ch := make(chan struct{})
wg.Add(1)
go func() {
Expand All @@ -598,6 +599,22 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
}
}()
<-ch
}
// Before every test bootstrap(DDL operation), we add a user and a system DB's DDL operations.
tc.OnBootstrapExported = func(s session.Session) {
var query1, query2 string
switch tc.Cnt.Load() % 2 {
case 0:
query1 = fmt.Sprintf("alter table mysql.pause_user_ddl_t add index idx_%d(a)", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table test.pause_user_ddl_t add column c_%d int", tc.Cnt.Load())
case 1:
// Make sure case0 and case1 use different table ID. Then case1's table won't be filtered because they use the same table ID.
query1 = fmt.Sprintf("alter table test.pause_user_ddl_t1 add index idx_%d(a)", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table mysql.pause_user_ddl_t1 add column c_%d int", tc.Cnt.Load())
}
tc.Cnt.Add(1)
asyncExecDDL(query1)
asyncExecDDL(query2)

rows, err := execute(context.Background(), s, sql)
require.NoError(t, err)
Expand Down

0 comments on commit 8c1d2c8

Please sign in to comment.