Skip to content

Commit

Permalink
ddl: job context will be canceled when cancel or pause job (#56404)
Browse files Browse the repository at this point in the history
close #56017, ref #56398
  • Loading branch information
lance6716 authored Oct 15, 2024
1 parent ee4b86d commit 4c1979a
Show file tree
Hide file tree
Showing 27 changed files with 393 additions and 280 deletions.
2 changes: 0 additions & 2 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errno"
Expand Down Expand Up @@ -262,7 +261,6 @@ func TestCancelVariousJobs(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(2048)`)

// Change some configurations.
ddl.ReorgWaitTimeout = 10 * time.Millisecond
tk.MustExec("set @@tidb_ddl_reorg_batch_size = 8")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1")
tk = testkit.NewTestKit(t, store)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and update job args.
case model.StateNone:
if err = savePDSchedule(w.ctx, args); err != nil {
if err = savePDSchedule(w.workCtx, args); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -746,17 +746,17 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, jobCtx.store, jobCtx.metaMut, job, args.FlashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.workCtx, sess, jobCtx.store, jobCtx.metaMut, job, args.FlashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
args.StartTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.StartTS, err = jobCtx.store.GetOracle().GetTimestamp(w.workCtx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
keyRanges, err := getFlashbackKeyRanges(w.ctx, sess, args.FlashbackTS)
keyRanges, err := getFlashbackKeyRanges(w.workCtx, sess, args.FlashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -779,10 +779,10 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
return updateSchemaVersion(jobCtx, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(w.ctx, jobCtx.store, args.FlashbackKeyRanges)
splitRegionsByKeyRanges(w.workCtx, jobCtx.store, args.FlashbackKeyRanges)
totalRegions.Store(0)
for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
if err = flashbackToVersion(w.workCtx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
Expand All @@ -795,7 +795,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
args.LockedRegionCnt = totalRegions.Load()

// We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC.
args.CommitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
args.CommitTS, err = jobCtx.store.GetOracle().GetTimestamp(w.workCtx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -813,7 +813,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, job *model.Job) (ver int
}

for _, r := range args.FlashbackKeyRanges {
if err = flashbackToVersion(w.ctx, jobCtx.store,
if err = flashbackToVersion(w.workCtx, jobCtx.store,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, jobCtx.store.(tikv.Storage), args.FlashbackTS, args.StartTS, args.CommitTS, r)
Expand Down Expand Up @@ -854,7 +854,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
}
defer w.sessPool.Put(sess)

err = kv.RunInNewTxn(w.ctx, w.store, true, func(context.Context, kv.Transaction) error {
err = kv.RunInNewTxn(w.workCtx, w.store, true, func(context.Context, kv.Transaction) error {
if err = recoverPDSchedule(w.ctx, args.PDScheduleValue); err != nil {
return errors.Trace(err)
}
Expand All @@ -865,18 +865,18 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
}
}

if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBSuperReadOnly, args.SuperReadOnly); err != nil {
if err = setGlobalSysVarFromBool(w.workCtx, sess, variable.TiDBSuperReadOnly, args.SuperReadOnly); err != nil {
return errors.Trace(err)
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBTTLJobEnable, args.EnableTTLJob); err != nil {
if err = setGlobalSysVarFromBool(w.workCtx, sess, variable.TiDBTTLJobEnable, args.EnableTTLJob); err != nil {
return errors.Trace(err)
}
}

if err := setGlobalSysVarFromBool(w.ctx, sess, variable.TiDBEnableAutoAnalyze, args.EnableAutoAnalyze); err != nil {
if err := setGlobalSysVarFromBool(w.workCtx, sess, variable.TiDBEnableAutoAnalyze, args.EnableAutoAnalyze); err != nil {
return errors.Trace(err)
}

Expand Down
52 changes: 30 additions & 22 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}

func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(
ctx context.Context,
t table.Table,
reorgInfo *reorgInfo,
) error {
logutil.DDLLogger().Info("start to update table row", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo))
if tbl, ok := t.(table.PartitionedTable); ok {
done := false
Expand All @@ -494,7 +498,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
err := w.writePhysicalTableRecord(w.ctx, w.sessPool, p, workType, reorgInfo)
err := w.writePhysicalTableRecord(ctx, w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
Expand All @@ -506,34 +510,30 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
var TestReorgGoroutineRunning = make(chan any)
var TestReorgGoroutineRunning = make(chan struct{})

// updateCurrentElement update the current element for reorgInfo.
func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error {
failpoint.Inject("mockInfiniteReorgLogic", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
a := new(any)
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.isReorgCancelled(reorgInfo.Job.ID) {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
}
}
func (w *worker) updateCurrentElement(
ctx context.Context,
t table.Table,
reorgInfo *reorgInfo,
) error {
failpoint.Inject("mockInfiniteReorgLogic", func() {
TestReorgGoroutineRunning <- struct{}{}
<-ctx.Done()
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
})
// TODO: Support partition tables.
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
//nolint:forcetypeassert
err := w.updatePhysicalTableRow(t.(table.PhysicalTable), reorgInfo)
err := w.updatePhysicalTableRow(ctx, t.(table.PhysicalTable), reorgInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
if err != nil {
return errors.Trace(err)
}
err = w.addTableIndex(t, reorgInfo)
err = w.addTableIndex(ctx, t, reorgInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1151,7 +1151,15 @@ func checkAddColumnTooManyColumns(colNum int) error {

// modifyColsFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.TableInfo, cols []*model.ColumnInfo, newCol *model.ColumnInfo, isDataTruncated bool) error {
func modifyColsFromNull2NotNull(
ctx context.Context,
w *worker,
dbInfo *model.DBInfo,
tblInfo *model.TableInfo,
cols []*model.ColumnInfo,
newCol *model.ColumnInfo,
isDataTruncated bool,
) error {
// Get sessionctx from context resource pool.
var sctx sessionctx.Context
sctx, err := w.sessPool.Get()
Expand All @@ -1169,7 +1177,7 @@ func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.
})
if !skipCheck {
// If there is a null value inserted, it cannot be modified and needs to be rollback.
err = checkForNullValue(w.ctx, sctx, isDataTruncated, dbInfo.Name, tblInfo.Name, newCol, cols...)
err = checkForNullValue(ctx, sctx, isDataTruncated, dbInfo.Name, tblInfo.Name, newCol, cols...)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,7 @@ func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) {
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockInfiniteReorgLogic", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockInfiniteReorgLogic"))
}()
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockInfiniteReorgLogic", `return()`)

tk.MustExec("drop table if exists ctc_goroutine_leak")
tk.MustExec("create table ctc_goroutine_leak (a int)")
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -100,7 +101,7 @@ func (w *worker) onAddCheckConstraint(jobCtx *jobContext, job *model.Job) (ver i
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta)
err = w.verifyRemainRecordsForCheckConstraint(jobCtx.stepCtx, dbInfo, tblInfo, constraintInfoInMeta)
if err != nil {
if dbterror.ErrCheckConstraintIsViolated.Equal(err) {
job.State = model.JobStateRollingback
Expand Down Expand Up @@ -245,7 +246,7 @@ func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, job *model.Job) (ver
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo)
err = w.verifyRemainRecordsForCheckConstraint(jobCtx.stepCtx, dbInfo, tblInfo, constraintInfo)
if err != nil {
if dbterror.ErrCheckConstraintIsViolated.Equal(err) {
job.State = model.JobStateRollingback
Expand Down Expand Up @@ -351,7 +352,12 @@ func findDependentColsInExpr(expr ast.ExprNode) map[string]struct{} {
return colsMap
}

func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tableInfo *model.TableInfo, constr *model.ConstraintInfo) error {
func (w *worker) verifyRemainRecordsForCheckConstraint(
ctx context.Context,
dbInfo *model.DBInfo,
tableInfo *model.TableInfo,
constr *model.ConstraintInfo,
) error {
// Inject a fail-point to skip the remaining records check.
failpoint.Inject("mockVerifyRemainDataSuccess", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -371,7 +377,7 @@ func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tab
// can let the check expression restored string as the filter in where clause directly.
// Prepare internal SQL to fetch data from physical table under this filter.
sql := fmt.Sprintf("select 1 from `%s`.`%s` where not %s limit 1", dbInfo.Name.L, tableInfo.Name.L, constr.ExprString)
ctx := kv.WithInternalSourceType(w.ctx, kv.InternalTxnDDL)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, sql)
if err != nil {
return errors.Trace(err)
Expand Down
12 changes: 0 additions & 12 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,18 +518,6 @@ func (dc *ddlCtx) removeReorgCtx(jobID int64) {
}
}

func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
rc := dc.getReorgCtx(job.ID)
if rc == nil {
logutil.DDLLogger().Warn("cannot find reorgCtx", zap.Int64("Job ID", job.ID))
return
}
logutil.DDLLogger().Info("notify reorg worker the job's state",
zap.Int64("Job ID", job.ID), zap.Stringer("Job State", job.State),
zap.Stringer("Schema State", job.SchemaState))
rc.notifyJobState(job.State)
}

// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
func EnableTiFlashPoll(d any) {
if dd, ok := d.(*ddl); ok {
Expand Down
23 changes: 17 additions & 6 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (w *worker) onCreateForeignKey(jobCtx *jobContext, job *model.Job) (ver int
return ver, nil
case model.StateWriteOnly:
delayForAsyncCommit()
err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, fkInfo, fkCheck)
err = checkForeignKeyConstrain(jobCtx.stepCtx, w, job.SchemaName, tblInfo.Name.L, fkInfo, fkCheck)
if err != nil {
job.State = model.JobStateRollingback
return ver, err
Expand Down Expand Up @@ -224,7 +224,7 @@ func checkTableForeignKeyValidInOwner(jobCtx *jobContext, job *model.Job, tbInfo
if fk.RefSchema.L == job.SchemaName && fk.RefTable.L == tbInfo.Name.L {
referTableInfo = tbInfo
} else {
referTable, err := is.TableByName(jobCtx.ctx, fk.RefSchema, fk.RefTable)
referTable, err := is.TableByName(jobCtx.stepCtx, fk.RefSchema, fk.RefTable)
if err != nil {
if !fkCheck && (infoschema.ErrTableNotExists.Equal(err) || infoschema.ErrDatabaseNotExists.Equal(err)) {
continue
Expand All @@ -241,7 +241,7 @@ func checkTableForeignKeyValidInOwner(jobCtx *jobContext, job *model.Job, tbInfo
}
referredFKInfos := is.GetTableReferredForeignKeys(job.SchemaName, tbInfo.Name.L)
for _, referredFK := range referredFKInfos {
childTable, err := is.TableByName(jobCtx.ctx, referredFK.ChildSchema, referredFK.ChildTable)
childTable, err := is.TableByName(jobCtx.stepCtx, referredFK.ChildSchema, referredFK.ChildTable)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func checkDatabaseHasForeignKeyReferredInOwner(jobCtx *jobContext, job *model.Jo
return nil
}
is := jobCtx.infoCache.GetLatest()
err = checkDatabaseHasForeignKeyReferred(jobCtx.ctx, is, pmodel.NewCIStr(job.SchemaName), fkCheck)
err = checkDatabaseHasForeignKeyReferred(jobCtx.stepCtx, is, pmodel.NewCIStr(job.SchemaName), fkCheck)
if err != nil {
job.State = model.JobStateCancelled
}
Expand Down Expand Up @@ -670,7 +670,13 @@ func checkAddForeignKeyValidInOwner(infoCache *infoschema.InfoCache, schema stri
return nil
}

func checkForeignKeyConstrain(w *worker, schema, table string, fkInfo *model.FKInfo, fkCheck bool) error {
func checkForeignKeyConstrain(
ctx context.Context,
w *worker,
schema, table string,
fkInfo *model.FKInfo,
fkCheck bool,
) error {
if !fkCheck {
return nil
}
Expand Down Expand Up @@ -718,7 +724,12 @@ func checkForeignKeyConstrain(w *worker, schema, table string, fkInfo *model.FKI
}
buf.WriteString(" from %n.%n ) limit 1")
paramsList = append(paramsList, fkInfo.RefSchema.L, fkInfo.RefTable.L)
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(w.ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, buf.String(), paramsList...)
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(
ctx,
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
buf.String(),
paramsList...,
)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 4c1979a

Please sign in to comment.