Skip to content

Commit

Permalink
ddl: fix waiting for wrong schema version if TiDB server restart (#39985
Browse files Browse the repository at this point in the history
)

close #39981
  • Loading branch information
wjhuang2016 authored Dec 20, 2022
1 parent 9f4dd80 commit 030c506
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
38 changes: 34 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) {
}

// checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists.
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) {
sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID)
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) {
sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
rows, err := sess.execute(context.Background(), sql, "check-mdl-info")
if err != nil {
return false, err
return false, 0, err
}
return len(rows) > 0, nil
if len(rows) == 0 {
return false, 0, nil
}
ver := rows[0].GetInt64(0)
return true, ver, nil
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
Expand Down Expand Up @@ -1377,6 +1381,32 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l
zap.String("job", job.String()))
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

timeStart := time.Now()
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSynced handles the following situation:
// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time,
// Then the worker restarts quickly, we may run the job immediately again,
Expand Down
6 changes: 2 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if variable.EnableMDL.Load() {
exist, err := checkMDLInfo(job.ID, d.sessPool)
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String()))
// Release the worker resource.
Expand All @@ -246,10 +246,8 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
} else if exist {
// Release the worker resource.
pool.put(wk)
err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
err = waitSchemaSyncedForMDL(d.ddlCtx, job, version)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
d.once.Store(false)
Expand Down

0 comments on commit 030c506

Please sign in to comment.