Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: delete mInFlashbackCluster related codes #38241

Merged
merged 8 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 27 additions & 49 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,39 +383,22 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and save the PD schedule.
case model.StateNone:
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err = savePDSchedule(job); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
return ver, errors.Trace(err)
}
if flashbackJobID == 0 || flashbackJobID == job.ID {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID)
})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if err = savePDSchedule(job); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
} else {
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
Expand Down Expand Up @@ -461,11 +444,15 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}

func finishFlashbackCluster(w *worker, job *model.Job) error {
// Didn't do anything during flashback, return directly
if job.SchemaState == model.StateNone {
return nil
}

var flashbackTS uint64
var pdScheduleValue map[string]interface{}
var readOnlyValue string
var gcEnabled bool
var jobID int64

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil {
return errors.Trace(err)
Expand All @@ -478,32 +465,23 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err = t.GetFlashbackClusterJobID()
if err != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if jobID == job.ID {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
return err
}
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
}
if job.IsDone() || job.IsSynced() {
gcSafePoint, err := gcutil.GetGCSafePoint(sess)
if err != nil {
return err
}
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
}
if job.IsDone() || job.IsSynced() {
gcSafePoint, err := gcutil.GetGCSafePoint(sess)
if err != nil {
return err
}
if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil {
return err
}
}
if err = t.SetFlashbackClusterJobID(0); err != nil {
if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil {
return err
}
}
Expand Down
42 changes: 42 additions & 0 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,44 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func TestAddDDLDuringFlashback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
if job.SchemaState == model.StateWriteReorganization {
_, err := tk.Exec("alter table t add column b int")
assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job")
}
}
dom.DDL().SetHook(hook)
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func TestGlobalVariablesOnFlashback(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
Expand Down Expand Up @@ -247,12 +285,16 @@ func TestCancelFlashbackCluster(t *testing.T) {
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

// Try canceled on StateWriteOnly, cancel success
tk.MustExec("set global tidb_super_read_only = off")
hook := newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteOnly
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)
rs, err := tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
Expand Down
56 changes: 24 additions & 32 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,20 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
ids, err := t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err := t.GenGlobalIDs(len(tasks))

jobs, err := t.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
if job.Type == model.ActionFlashbackCluster {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}
}

for i, task := range tasks {
job := task.job
Expand Down Expand Up @@ -386,17 +389,24 @@ func setJobStateToQueueing(job *model.Job) {
func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
var ids []int64
var err error

sess, err := d.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer d.sessPool.put(sess)
job, err := getJobsBySQL(newSession(sess), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
if err != nil {
return errors.Trace(err)
}
if len(job) != 0 {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}

startTS := uint64(0)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand All @@ -415,13 +425,8 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
jobTasks[i] = job
injectModifyJobArgFailPoint(job)
}
sess, err1 := d.sessPool.get()
if err1 == nil {
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err1 = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
d.sessPool.put(sess)
}
err = err1
sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -1130,19 +1135,6 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
}

// Should check flashbackClusterJobID.
// Some ddl jobs maybe added between check and insert into ddl job table.
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if flashbackJobID != 0 && flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now")
}

timeStart := time.Now()
if job.RealStartTS == 0 {
job.RealStartTS = t.StartTS
Expand Down
13 changes: 9 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (d *ddl) getGeneralJob(sess *session) (*model.Job, error) {
return d.checkJobIsRunnable(sess, sql)
}
// Check if there is any running job works on the same table.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0", job.ID)
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+
"or (type = %d and processing)", job.ID, model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
})
}
Expand All @@ -144,9 +146,12 @@ func (d *ddl) checkJobIsRunnable(sess *session, sql string) (bool, error) {

func (d *ddl) getReorgJob(sess *session) (*model.Job, error) {
return d.getJob(sess, reorg, func(job *model.Job) (bool, error) {
// Check if there is any drop schema ddl running.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) limit 1",
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)))
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
"or (CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+
"or (type = %d and processing) limit 1",
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster)
return d.checkJobIsRunnable(sess, sql)
})
}
Expand Down
19 changes: 0 additions & 19 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ var (
mPolicyMagicByte = CurrentMagicByteVer
mDDLTableVersion = []byte("DDLTableVersion")
mConcurrentDDL = []byte("concurrentDDL")
mInFlashbackCluster = []byte("InFlashbackCluster")
mFlashbackHistoryTSRange = []byte("FlashbackHistoryTSRange")
)

Expand Down Expand Up @@ -606,24 +605,6 @@ func (m *Meta) CheckMDLTableExists() (bool, error) {
return bytes.Equal(v, []byte("2")), nil
}

// SetFlashbackClusterJobID set flashback cluster jobID
func (m *Meta) SetFlashbackClusterJobID(jobID int64) error {
return errors.Trace(m.txn.Set(mInFlashbackCluster, m.jobIDKey(jobID)))
}

// GetFlashbackClusterJobID returns flashback cluster jobID.
func (m *Meta) GetFlashbackClusterJobID() (int64, error) {
val, err := m.txn.Get(mInFlashbackCluster)
if err != nil {
return 0, errors.Trace(err)
}
if len(val) == 0 {
return 0, nil
}

return int64(binary.BigEndian.Uint64(val)), nil
}

// TSRange store a range time
type TSRange struct {
StartTS uint64
Expand Down
5 changes: 5 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,12 @@ func (job *Job) hasDependentTableForExchangePartition(other *Job) (bool, error)
// How to check the job depends on "other"?
// 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type.
// 2. Or the two jobs handle the same table.
// 3. Or other job is flashback cluster.
func (job *Job) IsDependentOn(other *Job) (bool, error) {
if other.Type == ActionFlashbackCluster {
return true, nil
}

isDependent, err := job.hasDependentSchema(other)
if err != nil || isDependent {
return isDependent, errors.Trace(err)
Expand Down
13 changes: 13 additions & 0 deletions parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ func TestJobCodec(t *testing.T) {
require.NoError(t, err)
require.True(t, isDependent)

// test ActionFlashbackCluster with other ddl jobs are dependent.
job15 := &Job{
ID: 16,
Type: ActionFlashbackCluster,
BinlogInfo: &HistoryInfo{},
Args: []interface{}{0, map[string]interface{}{}, "ON", true},
}
job15.RawArgs, err = json.Marshal(job15.Args)
require.NoError(t, err)
isDependent, err = job.IsDependentOn(job15)
require.NoError(t, err)
require.True(t, isDependent)

require.Equal(t, false, job.IsCancelled())
b, err := job.Encode(false)
require.NoError(t, err)
Expand Down