Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55049
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
tangenta authored and ti-chi-bot committed Aug 26, 2024
1 parent d36d14a commit f1e0ea6
Show file tree
Hide file tree
Showing 3 changed files with 2,563 additions and 2 deletions.
47 changes: 45 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type reorgCtx struct {
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
// notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled.
Expand All @@ -79,10 +79,33 @@ type reorgCtx struct {
}
}

<<<<<<< HEAD:ddl/reorg.go
// nullableKey can store <nil> kv.Key.
// Storing a nil object to atomic.Value can lead to panic. This is a workaround.
type nullableKey struct {
key kv.Key
=======
// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
type reorgFnResult struct {
ownerTS int64
err error
}

func newReorgExprCtx() exprctx.ExprContext {
evalCtx := contextstatic.NewStaticEvalContext(
contextstatic.WithSQLMode(mysql.ModeNone),
contextstatic.WithTypeFlags(types.DefaultStmtFlags),
contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels),
)

planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn)

return contextstatic.NewStaticExprContext(
contextstatic.WithEvalCtx(evalCtx),
contextstatic.WithPlanCacheTracker(&planCacheTracker),
)
>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/reorg.go
}

// newContext gets a context. It is only used for adding column in reorganization state.
Expand Down Expand Up @@ -205,11 +228,22 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
if job.IsCancelling() {
return dbterror.ErrCancelledDDLJob
}
<<<<<<< HEAD:ddl/reorg.go
rc = w.newReorgCtx(reorgInfo)
w.wg.Add(1)
go func() {
defer w.wg.Done()
rc.doneCh <- f()
=======

beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS()
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
defer w.wg.Done()
err := reorgFn()
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/reorg.go
}()
}

Expand All @@ -225,7 +259,16 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo

// wait reorganization job done or timeout
select {
case err := <-rc.doneCh:
case res := <-rc.doneCh:
err := res.err
curTS := w.ddlCtx.reorgCtx.getOwnerTS()
if res.ownerTS != curTS {
d.removeReorgCtx(job.ID)
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job)
Expand Down
Loading

0 comments on commit f1e0ea6

Please sign in to comment.