diff --git a/ddl/cluster.go b/ddl/cluster.go index 8483ca433d7ff..a6f80991ca0c9 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -383,39 +383,22 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve switch job.SchemaState { // Stage 1, check and set FlashbackClusterJobID, and save the PD schedule. case model.StateNone: - flashbackJobID, err := t.GetFlashbackClusterJobID() + if err = savePDSchedule(job); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + readOnlyValue, err = getTiDBSuperReadOnly(sess) if err != nil { job.State = model.JobStateCancelled - return ver, err + return ver, errors.Trace(err) } - if flashbackJobID == 0 || flashbackJobID == job.ID { - err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { - return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID) - }) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - if err = savePDSchedule(job); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - readOnlyValue, err = getTiDBSuperReadOnly(sess) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - job.Args[readOnlyArgsOffset] = &readOnlyValue - gcEnableValue, err := gcutil.CheckGCEnable(sess) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - job.Args[gcEnabledArgsOffset] = &gcEnableValue - } else { + job.Args[readOnlyArgsOffset] = &readOnlyValue + gcEnableValue, err := gcutil.CheckGCEnable(sess) + if err != nil { job.State = model.JobStateCancelled - return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) + return ver, errors.Trace(err) } + job.Args[gcEnabledArgsOffset] = &gcEnableValue job.SchemaState = model.StateWriteOnly return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. @@ -461,11 +444,15 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } func finishFlashbackCluster(w *worker, job *model.Job) error { + // Didn't do anything during flashback, return directly + if job.SchemaState == model.StateNone { + return nil + } + var flashbackTS uint64 var pdScheduleValue map[string]interface{} var readOnlyValue string var gcEnabled bool - var jobID int64 if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { return errors.Trace(err) @@ -478,32 +465,23 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - jobID, err = t.GetFlashbackClusterJobID() - if err != nil { + if err = recoverPDSchedule(pdScheduleValue); err != nil { return err } - if jobID == job.ID { - if err = recoverPDSchedule(pdScheduleValue); err != nil { + if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { + return err + } + if gcEnabled { + if err = gcutil.EnableGC(sess); err != nil { return err } - if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { + } + if job.IsDone() || job.IsSynced() { + gcSafePoint, err := gcutil.GetGCSafePoint(sess) + if err != nil { return err } - if gcEnabled { - if err = gcutil.EnableGC(sess); err != nil { - return err - } - } - if job.IsDone() || job.IsSynced() { - gcSafePoint, err := gcutil.GetGCSafePoint(sess) - if err != nil { - return err - } - if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil { - return err - } - } - if err = t.SetFlashbackClusterJobID(0); err != nil { + if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil { return err } } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 924e7e0e9fd31..92ffd4391d384 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -162,6 +162,44 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } +func TestAddDDLDuringFlashback(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + originHook := dom.DDL().GetHook() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + assert.Equal(t, model.ActionFlashbackCluster, job.Type) + if job.SchemaState == model.StateWriteReorganization { + _, err := tk.Exec("alter table t add column b int") + assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job") + } + } + dom.DDL().SetHook(hook) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + dom.DDL().SetHook(originHook) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) +} + func TestGlobalVariablesOnFlashback(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) originHook := dom.DDL().GetHook() @@ -247,12 +285,16 @@ func TestCancelFlashbackCluster(t *testing.T) { tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // Try canceled on StateWriteOnly, cancel success + tk.MustExec("set global tidb_super_read_only = off") hook := newCancelJobHook(t, store, dom, func(job *model.Job) bool { return job.SchemaState == model.StateWriteOnly }) dom.DDL().SetHook(hook) tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob) hook.MustCancelDone(t) + rs, err := tk.Exec("show variables like 'tidb_super_read_only'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) // Try canceled on StateWriteReorganization, cancel failed hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 4bad7d5909138..2e9d66b3149f2 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -319,17 +319,20 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - jobID, err := t.GetFlashbackClusterJobID() + ids, err := t.GenGlobalIDs(len(tasks)) if err != nil { return errors.Trace(err) } - if jobID != 0 { - return errors.Errorf("Can't add to ddl table, cluster is flashing back now") - } - ids, err := t.GenGlobalIDs(len(tasks)) + + jobs, err := t.GetAllDDLJobsInQueue(meta.DefaultJobListKey) if err != nil { return errors.Trace(err) } + for _, job := range jobs { + if job.Type == model.ActionFlashbackCluster { + return errors.Errorf("Can't add ddl job, have flashback cluster job") + } + } for i, task := range tasks { job := task.job @@ -386,17 +389,24 @@ func setJobStateToQueueing(job *model.Job) { func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error { var ids []int64 var err error + + sess, err := d.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer d.sessPool.put(sess) + job, err := getJobsBySQL(newSession(sess), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster)) + if err != nil { + return errors.Trace(err) + } + if len(job) != 0 { + return errors.Errorf("Can't add ddl job, have flashback cluster job") + } + startTS := uint64(0) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - jobID, err := t.GetFlashbackClusterJobID() - if err != nil { - return errors.Trace(err) - } - if jobID != 0 { - return errors.Errorf("Can't add to ddl table, cluster is flashing back now") - } ids, err = t.GenGlobalIDs(len(tasks)) if err != nil { return errors.Trace(err) @@ -415,13 +425,8 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error { jobTasks[i] = job injectModifyJobArgFailPoint(job) } - sess, err1 := d.sessPool.get() - if err1 == nil { - sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) - err1 = insertDDLJobs2Table(newSession(sess), true, jobTasks...) - d.sessPool.put(sess) - } - err = err1 + sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + err = insertDDLJobs2Table(newSession(sess), true, jobTasks...) } return errors.Trace(err) } @@ -1130,19 +1135,6 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, if job.Type != model.ActionMultiSchemaChange { logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String())) } - - // Should check flashbackClusterJobID. - // Some ddl jobs maybe added between check and insert into ddl job table. - flashbackJobID, err := t.GetFlashbackClusterJobID() - if err != nil { - job.State = model.JobStateCancelled - return ver, err - } - if flashbackJobID != 0 && flashbackJobID != job.ID { - job.State = model.JobStateCancelled - return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now") - } - timeStart := time.Now() if job.RealStartTS == 0 { job.RealStartTS = t.StartTS diff --git a/ddl/job_table.go b/ddl/job_table.go index 040ff971de72e..295602d64652f 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -132,7 +132,9 @@ func (d *ddl) getGeneralJob(sess *session) (*model.Job, error) { return d.checkJobIsRunnable(sess, sql) } // Check if there is any running job works on the same table. - sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0", job.ID) + sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+ + "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+ + "or (type = %d and processing)", job.ID, model.ActionFlashbackCluster) return d.checkJobIsRunnable(sess, sql) }) } @@ -144,9 +146,12 @@ func (d *ddl) checkJobIsRunnable(sess *session, sql string) (bool, error) { func (d *ddl) getReorgJob(sess *session) (*model.Job, error) { return d.getJob(sess, reorg, func(job *model.Job) (bool, error) { - // Check if there is any drop schema ddl running. - sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) limit 1", - strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10))) + // Check if there is any block ddl running, like drop schema and flashback cluster. + sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+ + "(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+ + "or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+ + "or (type = %d and processing) limit 1", + strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster) return d.checkJobIsRunnable(sess, sql) }) } diff --git a/meta/meta.go b/meta/meta.go index d0884f722e70e..f132d0597d1da 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -78,7 +78,6 @@ var ( mPolicyMagicByte = CurrentMagicByteVer mDDLTableVersion = []byte("DDLTableVersion") mConcurrentDDL = []byte("concurrentDDL") - mInFlashbackCluster = []byte("InFlashbackCluster") mFlashbackHistoryTSRange = []byte("FlashbackHistoryTSRange") ) @@ -606,24 +605,6 @@ func (m *Meta) CheckMDLTableExists() (bool, error) { return bytes.Equal(v, []byte("2")), nil } -// SetFlashbackClusterJobID set flashback cluster jobID -func (m *Meta) SetFlashbackClusterJobID(jobID int64) error { - return errors.Trace(m.txn.Set(mInFlashbackCluster, m.jobIDKey(jobID))) -} - -// GetFlashbackClusterJobID returns flashback cluster jobID. -func (m *Meta) GetFlashbackClusterJobID() (int64, error) { - val, err := m.txn.Get(mInFlashbackCluster) - if err != nil { - return 0, errors.Trace(err) - } - if len(val) == 0 { - return 0, nil - } - - return int64(binary.BigEndian.Uint64(val)), nil -} - // TSRange store a range time type TSRange struct { StartTS uint64 diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 330a00a90ad2e..19164959815a3 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -678,7 +678,12 @@ func (job *Job) hasDependentTableForExchangePartition(other *Job) (bool, error) // How to check the job depends on "other"? // 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type. // 2. Or the two jobs handle the same table. +// 3. Or other job is flashback cluster. func (job *Job) IsDependentOn(other *Job) (bool, error) { + if other.Type == ActionFlashbackCluster { + return true, nil + } + isDependent, err := job.hasDependentSchema(other) if err != nil || isDependent { return isDependent, errors.Trace(err) diff --git a/parser/model/model_test.go b/parser/model/model_test.go index f897f7eb584f0..47a8ecad6e4a4 100644 --- a/parser/model/model_test.go +++ b/parser/model/model_test.go @@ -442,6 +442,19 @@ func TestJobCodec(t *testing.T) { require.NoError(t, err) require.True(t, isDependent) + // test ActionFlashbackCluster with other ddl jobs are dependent. + job15 := &Job{ + ID: 16, + Type: ActionFlashbackCluster, + BinlogInfo: &HistoryInfo{}, + Args: []interface{}{0, map[string]interface{}{}, "ON", true}, + } + job15.RawArgs, err = json.Marshal(job15.Args) + require.NoError(t, err) + isDependent, err = job.IsDependentOn(job15) + require.NoError(t, err) + require.True(t, isDependent) + require.Equal(t, false, job.IsCancelled()) b, err := job.Encode(false) require.NoError(t, err)