Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: remove pause all jobs when upgrading (#46234) #46265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,30 +1173,6 @@ func syncUpgradeState(s Session) {
time.Sleep(interval)
}

retryTimes = 60
interval = 500 * time.Millisecond
for i := 0; i < retryTimes; i++ {
jobErrs, err := ddl.PauseAllJobsBySystem(s)
if err == nil && len(jobErrs) == 0 {
break
}
jobErrStrs := make([]string, 0, len(jobErrs))
for _, jobErr := range jobErrs {
if dbterror.ErrPausedDDLJob.Equal(jobErr) {
continue
}
jobErrStrs = append(jobErrStrs, jobErr.Error())
}
if err == nil && len(jobErrStrs) == 0 {
break
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("[upgrading] update global state to upgrading", zap.String("state", syncer.StateUpgrading))
}

Expand Down
37 changes: 27 additions & 10 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R
return rows, nil
}

// TestUpgradeWithPauseDDL adds a user and a system DB's DDL operations, before every test bootstrap(DDL operation). It tests:
//
// 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused.
// 2.Check user DDLs are handled after system DDLs.
func TestUpgradeWithPauseDDL(t *testing.T) {
session.SupportUpgradeStateVer--
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
Expand All @@ -473,6 +477,12 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table test.pause_user_ddl_t(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table mysql.pause_user_ddl_t(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table test.pause_user_ddl_t1(a int, b int)")
require.NoError(t, err)
_, err = execute(context.Background(), se, "create table mysql.pause_user_ddl_t1(a int, b int)")
require.NoError(t, err)

tc := session.TestCallback{Cnt: atomicutil.NewInt32(0)}
sql := "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) order by processing desc, job_id"
Expand All @@ -483,16 +493,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
}

wg := sync.WaitGroup{}
tc.OnBootstrapExported = func(s session.Session) {
var query string
switch tc.Cnt.Load() % 2 {
case 0:
query = fmt.Sprintf("alter table test.pause_user_ddl_t add index idx_%d(a)", tc.Cnt.Load())
case 1:
query = fmt.Sprintf("alter table test.pause_user_ddl_t add column c_%d int", tc.Cnt.Load())
}
tc.Cnt.Add(1)

asyncExecDDL := func(query string) {
ch := make(chan struct{})
wg.Add(1)
go func() {
Expand All @@ -507,6 +508,22 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
}
}()
<-ch
}
// Before every test bootstrap(DDL operation), we add a user and a system DB's DDL operations.
tc.OnBootstrapExported = func(s session.Session) {
var query1, query2 string
switch tc.Cnt.Load() % 2 {
case 0:
query1 = fmt.Sprintf("alter table mysql.pause_user_ddl_t add index idx_%d(a)", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table test.pause_user_ddl_t add column c_%d int", tc.Cnt.Load())
case 1:
// Make sure case0 and case1 use different table ID. Then case1's table won't be filtered because they use the same table ID.
query1 = fmt.Sprintf("alter table test.pause_user_ddl_t1 add index idx_%d(a)", tc.Cnt.Load())
query2 = fmt.Sprintf("alter table mysql.pause_user_ddl_t1 add column c_%d int", tc.Cnt.Load())
}
tc.Cnt.Add(1)
asyncExecDDL(query1)
asyncExecDDL(query2)

rows, err := execute(context.Background(), s, sql)
require.NoError(t, err)
Expand Down