Skip to content

Commit

Permalink
*: delete mInFlashbackCluster related codes (#38241)
Browse files Browse the repository at this point in the history
ref #37197
  • Loading branch information
Defined2014 committed Sep 30, 2022
1 parent d4f4c81 commit 8f18fce
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 104 deletions.
76 changes: 27 additions & 49 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,39 +383,22 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and save the PD schedule.
case model.StateNone:
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err = savePDSchedule(job); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
return ver, errors.Trace(err)
}
if flashbackJobID == 0 || flashbackJobID == job.ID {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID)
})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if err = savePDSchedule(job); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
} else {
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
Expand Down Expand Up @@ -461,11 +444,15 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}

func finishFlashbackCluster(w *worker, job *model.Job) error {
// Didn't do anything during flashback, return directly
if job.SchemaState == model.StateNone {
return nil
}

var flashbackTS uint64
var pdScheduleValue map[string]interface{}
var readOnlyValue string
var gcEnabled bool
var jobID int64

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil {
return errors.Trace(err)
Expand All @@ -478,32 +465,23 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err = t.GetFlashbackClusterJobID()
if err != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if jobID == job.ID {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
return err
}
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
}
if job.IsDone() || job.IsSynced() {
gcSafePoint, err := gcutil.GetGCSafePoint(sess)
if err != nil {
return err
}
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
}
if job.IsDone() || job.IsSynced() {
gcSafePoint, err := gcutil.GetGCSafePoint(sess)
if err != nil {
return err
}
if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil {
return err
}
}
if err = t.SetFlashbackClusterJobID(0); err != nil {
if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil {
return err
}
}
Expand Down
42 changes: 42 additions & 0 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,44 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func TestAddDDLDuringFlashback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
if job.SchemaState == model.StateWriteReorganization {
_, err := tk.Exec("alter table t add column b int")
assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job")
}
}
dom.DDL().SetHook(hook)
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func TestGlobalVariablesOnFlashback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
Expand Down Expand Up @@ -247,12 +285,16 @@ func TestCancelFlashbackCluster(t *testing.T) {
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

// Try canceled on StateWriteOnly, cancel success
tk.MustExec("set global tidb_super_read_only = off")
hook := newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteOnly
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)
rs, err := tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
Expand Down
56 changes: 24 additions & 32 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,20 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
ids, err := t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err := t.GenGlobalIDs(len(tasks))

jobs, err := t.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
if job.Type == model.ActionFlashbackCluster {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}
}

for i, task := range tasks {
job := task.job
Expand Down Expand Up @@ -386,17 +389,24 @@ func setJobStateToQueueing(job *model.Job) {
func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
var ids []int64
var err error

sess, err := d.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer d.sessPool.put(sess)
job, err := getJobsBySQL(newSession(sess), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
if err != nil {
return errors.Trace(err)
}
if len(job) != 0 {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}

startTS := uint64(0)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand All @@ -415,13 +425,8 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
jobTasks[i] = job
injectModifyJobArgFailPoint(job)
}
sess, err1 := d.sessPool.get()
if err1 == nil {
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err1 = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
d.sessPool.put(sess)
}
err = err1
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -1130,19 +1135,6 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
}

// Should check flashbackClusterJobID.
// Some ddl jobs maybe added between check and insert into ddl job table.
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if flashbackJobID != 0 && flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now")
}

timeStart := time.Now()
if job.RealStartTS == 0 {
job.RealStartTS = t.StartTS
Expand Down
13 changes: 9 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (d *ddl) getGeneralJob(sess *session) (*model.Job, error) {
return d.checkJobIsRunnable(sess, sql)
}
// Check if there is any running job works on the same table.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0", job.ID)
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+
"or (type = %d and processing)", job.ID, model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
})
}
Expand All @@ -144,9 +146,12 @@ func (d *ddl) checkJobIsRunnable(sess *session, sql string) (bool, error) {

func (d *ddl) getReorgJob(sess *session) (*model.Job, error) {
return d.getJob(sess, reorg, func(job *model.Job) (bool, error) {
// Check if there is any drop schema ddl running.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) limit 1",
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)))
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
"or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+
"or (type = %d and processing) limit 1",
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
})
}
Expand Down
19 changes: 0 additions & 19 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ var (
mPolicyMagicByte = CurrentMagicByteVer
mDDLTableVersion = []byte("DDLTableVersion")
mConcurrentDDL = []byte("concurrentDDL")
mInFlashbackCluster = []byte("InFlashbackCluster")
mFlashbackHistoryTSRange = []byte("FlashbackHistoryTSRange")
)

Expand Down Expand Up @@ -606,24 +605,6 @@ func (m *Meta) CheckMDLTableExists() (bool, error) {
return bytes.Equal(v, []byte("2")), nil
}

// SetFlashbackClusterJobID set flashback cluster jobID
func (m *Meta) SetFlashbackClusterJobID(jobID int64) error {
return errors.Trace(m.txn.Set(mInFlashbackCluster, m.jobIDKey(jobID)))
}

// GetFlashbackClusterJobID returns flashback cluster jobID.
func (m *Meta) GetFlashbackClusterJobID() (int64, error) {
val, err := m.txn.Get(mInFlashbackCluster)
if err != nil {
return 0, errors.Trace(err)
}
if len(val) == 0 {
return 0, nil
}

return int64(binary.BigEndian.Uint64(val)), nil
}

// TSRange store a range time
type TSRange struct {
StartTS uint64
Expand Down
5 changes: 5 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,12 @@ func (job *Job) hasDependentTableForExchangePartition(other *Job) (bool, error)
// How to check the job depends on "other"?
// 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type.
// 2. Or the two jobs handle the same table.
// 3. Or other job is flashback cluster.
func (job *Job) IsDependentOn(other *Job) (bool, error) {
if other.Type == ActionFlashbackCluster {
return true, nil
}

isDependent, err := job.hasDependentSchema(other)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
Expand Down
13 changes: 13 additions & 0 deletions parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ func TestJobCodec(t *testing.T) {
require.NoError(t, err)
require.True(t, isDependent)

// test ActionFlashbackCluster with other ddl jobs are dependent.
job15 := &Job{
ID: 16,
Type: ActionFlashbackCluster,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{0, map[string]interface{}{}, "ON", true},
}
job15.RawArgs, err = json.Marshal(job15.Args)
require.NoError(t, err)
isDependent, err = job.IsDependentOn(job15)
require.NoError(t, err)
require.True(t, isDependent)

require.Equal(t, false, job.IsCancelled())
b, err := job.Encode(false)
require.NoError(t, err)
Expand Down

0 comments on commit 8f18fce

Please sign in to comment.