Skip to content

Commit

Permalink
ddl, session: after upgrades, the order of processing on different ta…
Browse files Browse the repository at this point in the history
…bles is also more as expected (#44305)

close #44304
  • Loading branch information
zimulala committed Jun 5, 2023
1 parent 2cce0ea commit 47778c5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
3 changes: 2 additions & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
if job.IsPausedBySystem() && !hasSysDB(job) {
var errs []error
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
if len(errs) > 0 {
if len(errs) > 0 && errs[0] != nil {
logutil.BgLogger().Warn("[ddl-upgrading] normal cluster state, resume the job failed", zap.Stringer("job", job), zap.Error(errs[0]))
return false, errs[0]
}
Expand All @@ -212,6 +212,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
return false, err
}
logutil.BgLogger().Warn("[ddl-upgrading] normal cluster state, resume the job successfully", zap.Stringer("job", job))
return false, errors.Errorf("system paused job:%d need to be resumed", job.ID)
}

if job.IsPaused() {
Expand Down
10 changes: 10 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
Expand Down Expand Up @@ -1217,6 +1218,15 @@ func syncUpgradeState(s Session) {
}

func syncNormalRunning(s Session) {
failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) {
if val.(bool) {
dom := domain.GetDomain(s)
//nolint: errcheck
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning))
failpoint.Return()
}
})

jobErrs, err := ddl.ResumeAllJobsBySystem(s)
if err != nil {
logutil.BgLogger().Warn("[upgrading] resume all paused jobs failed", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
"//config",
"//ddl",
Expand All @@ -26,6 +26,7 @@ go_test(
"//util/chunk",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require", #keep
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_atomic//:atomic",
Expand Down
110 changes: 110 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -362,6 +363,115 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.True(t, suc)
}

func TestUpgradeVersionForResumeJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed", `return(true)`))
defer failpoint.Disable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed")

// Add a paused DDL job before upgrade.
session.MustExec(t, seV, "create table test.upgrade_tbl(a int, b int)")
session.MustExec(t, seV, "create table test.upgrade_tbl1(a int, b int)")
ch := make(chan struct{})
hook := &callback.TestDDLCallback{}
var jobID int64
doOnce := true
hook.OnGetJobBeforeExported = func(str string) {
if jobID == 0 || !doOnce {
return
}

for i := 0; i < 50; i++ {
sql := fmt.Sprintf("admin show ddl jobs where job_id=%d or job_id=%d", jobID, jobID+1)
se := session.CreateSessionAndSetID(t, store)
rows, err := execute(context.Background(), se, sql)
require.NoError(t, err)
if len(rows) == 2 {
doOnce = false
break
}
time.Sleep(100 * time.Millisecond)
}
}
wg := sync.WaitGroup{}
wg.Add(1)
times := 0
hook.OnGetJobAfterExported = func(tp string, job *model.Job) {
if job.SchemaState == model.StateWriteOnly && times == 0 {
ch <- struct{}{}
jobID = job.ID
times = 1
}
// Make sure we do jobID first, then do jobID+1.
if job.ID == jobID && job.SchemaState == model.StateWriteReorganization && job.State == model.JobStateQueueing && times == 1 {
times = 2
}
if job.ID == jobID+1 && job.SchemaState == model.StateNone && job.State == model.JobStateQueueing && times == 2 {
times = 3
}
if job.ID == jobID && job.State == model.JobStateDone && job.SchemaState == model.StatePublic {
wg.Done()
}
}

dom.DDL().SetHook(hook)
go func() {
// This "add index" job will be paused when upgrading.
_, _ = execute(context.Background(), seV, "alter table test.upgrade_tbl add index idx(a)")
}()

<-ch
dom.Close()
// Make sure upgrade is successful.
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
domLatestV.DDL().SetHook(hook)
seLatestV := session.CreateSessionAndSetID(t, store)
// Add a new DDL (an "add index" job uses a different table than the previous DDL job) to the DDL table.
session.MustExec(t, seLatestV, "alter table test.upgrade_tbl1 add index idx2(a)")
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

wg.Wait()
require.Equal(t, 3, times)
// Make sure the second add index operation is successful.
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id=%d or job_id=%d order by job_id", jobID, jobID+1)
rows, err := execute(context.Background(), seLatestV, sql)
require.NoError(t, err)
require.Len(t, rows, 2)
var idxFinishTS uint64
for i, row := range rows {
jobBinary := row.GetBytes(0)
runJob := model.Job{}
err := runJob.Decode(jobBinary)
require.NoError(t, err)
require.True(t, strings.Contains(runJob.TableName, "upgrade_tbl"))
require.Equal(t, model.JobStateSynced.String(), runJob.State.String())
if i == 0 {
idxFinishTS = runJob.BinlogInfo.FinishedTS
} else {
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
}
}
}

func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
Expand Down

0 comments on commit 47778c5

Please sign in to comment.