Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix a bug that MDL may progress unexpectedly or block forever #46921

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ const (
OnExistReplace

jobRecordCapacity = 16
jobOnceCapacity = 1000
)

var (
Expand Down Expand Up @@ -289,14 +290,14 @@ type waitSchemaSyncedController struct {
mu sync.RWMutex
job map[int64]struct{}

// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
once *atomicutil.Bool
// Use to check if the DDL job is the first run on this owner.
onceMap map[int64]struct{}
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
return &waitSchemaSyncedController{
job: make(map[int64]struct{}, jobRecordCapacity),
once: atomicutil.NewBool(true),
job: make(map[int64]struct{}, jobRecordCapacity),
onceMap: make(map[int64]struct{}, jobOnceCapacity),
}
}

Expand All @@ -319,6 +320,24 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
delete(w.job, job.ID)
}

// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
// Returns false means that the job must not be the first run on this owner.
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.onceMap[id]
return ok
}

func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onceMap) > jobOnceCapacity {
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
}
w.onceMap[id] = struct{}{}
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
ctx context.Context
Expand Down
29 changes: 16 additions & 13 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,11 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
// which may act like a deadlock.
w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
}

// In test and job is cancelling we can ignore the sleep
if !(intest.InTest && job.IsCancelling()) {
time.Sleep(GetWaitTimeWhenErrorOccurred())
// In test and job is cancelling we can ignore the sleep
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we combine conditions line838 and line840?

if !(intest.InTest && job.IsCancelling()) {
time.Sleep(GetWaitTimeWhenErrorOccurred())
}
}
}

Expand Down Expand Up @@ -1190,14 +1190,14 @@ func toTError(err error) *terror.Error {
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
}

// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
if waitTime == 0 {
return
return nil
}

timeStart := time.Now()
Expand All @@ -1208,29 +1208,33 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in

if latestSchemaVersion == 0 {
logutil.Logger(d.ctx).Info("schema version doesn't change", zap.String("category", "ddl"))
return
return nil
}

err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("update latest schema version failed", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err))
if variable.EnableMDL.Load() {
return err
}
if terror.ErrorEqual(err, context.DeadlineExceeded) {
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
// There is no need to use etcd to sync. The function returns directly.
return
return nil
}
}

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("wait latest schema version encounter error", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return
return err
}
logutil.Logger(d.ctx).Info("wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.String("category", "ddl"),
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
Expand Down Expand Up @@ -1289,8 +1293,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
}
})

waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
return nil
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
17 changes: 11 additions & 6 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (d *ddl) startDispatchLoop() {
}
if !d.isOwner() {
isOnce = true
d.once.Store(true)
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
time.Sleep(dispatchLoopWaitingDuration)
continue
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if !d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cause all jobs to have an extra "waitSchemaSynced"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After GC the onceMap, the jobs need to do an extra checkMDLInfo. If waitSchemaSynced passed already, checkMDLInfo would return nil for this job.

if variable.EnableMDL.Load() {
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
return
Expand All @@ -407,7 +407,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
pool.put(wk)
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
}
}

Expand All @@ -426,9 +426,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
if err != nil {
// May be caused by server closing, shouldn't clean the MDL info.
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
d.synced(job)

Expand Down
8 changes: 0 additions & 8 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,6 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
jobID = job.ID
times = 1
}
// Make sure we do jobID first, then do jobID+1.
if job.ID == jobID && job.SchemaState == model.StateWriteReorganization && job.State == model.JobStateQueueing && times == 1 {
times = 2
}
if job.ID == jobID+1 && job.SchemaState == model.StateNone && job.State == model.JobStateQueueing && times == 2 {
times = 3
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved
if job.ID == jobID && job.State == model.JobStateDone && job.SchemaState == model.StatePublic {
wg.Done()
}
Expand Down Expand Up @@ -621,7 +614,6 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
require.Equal(t, session.CurrentBootstrapVersion, ver)

wg.Wait()
require.Equal(t, 3, times)
// Make sure the second add index operation is successful.
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id=%d or job_id=%d order by job_id", jobID, jobID+1)
rows, err := execute(context.Background(), seLatestV, sql)
Expand Down