diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 9f90ad1039525..dac2f01216edb 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -535,16 +535,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) { } // checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists. -func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) { - sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID) +func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) { + sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID) sctx, _ := pool.get() defer pool.put(sctx) sess := newSession(sctx) rows, err := sess.execute(context.Background(), sql, "check-mdl-info") if err != nil { - return false, err + return false, 0, err } - return len(rows) > 0, nil + if len(rows) == 0 { + return false, 0, nil + } + ver := rows[0].GetInt64(0) + return true, ver, nil } func needUpdateRawArgs(job *model.Job, meetErr bool) bool { @@ -1376,6 +1380,32 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l zap.String("job", job.String())) } +// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. +func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error { + failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { + panic("check down before update global version failed") + } else { + mockDDLErrOnce = -1 + } + } + }) + + timeStart := time.Now() + // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). + err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion) + if err != nil { + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + return err + } + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", + zap.Int64("ver", latestSchemaVersion), + zap.Duration("take time", time.Since(timeStart)), + zap.String("job", job.String())) + return nil +} + // waitSchemaSynced handles the following situation: // If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time, // Then the worker restarts quickly, we may run the job immediately again, diff --git a/ddl/job_table.go b/ddl/job_table.go index 5bd702232a9f6..894c4c45b8380 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -236,7 +236,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { // check if this ddl job is synced to all servers. if !d.isSynced(job) || d.once.Load() { if variable.EnableMDL.Load() { - exist, err := checkMDLInfo(job.ID, d.sessPool) + exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String())) // Release the worker resource. @@ -245,10 +245,8 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { } else if exist { // Release the worker resource. pool.put(wk) - err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease) + err = waitSchemaSyncedForMDL(d.ddlCtx, job, version) if err != nil { - logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String())) - time.Sleep(time.Second) return } d.once.Store(false)