Skip to content

Commit

Permalink
*: support canceling DDL statements with KILL (#35803)
Browse files Browse the repository at this point in the history
close #24144
  • Loading branch information
morgo authored Jun 29, 2022
1 parent 5e547b5 commit 41c0ab4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 1 deletion.
5 changes: 5 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

var historyJob *model.Job
jobID := job.ID

// Attach the context of the jobId to the calling session so that
// KILL can cancel this DDL job.
ctx.GetSessionVars().StmtCtx.DDLJobID = jobID

// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
Expand Down
5 changes: 4 additions & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return err
}

defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }()
defer func() {
e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false
e.ctx.GetSessionVars().StmtCtx.DDLJobID = 0
}()

switch x := e.stmt.(type) {
case *ast.AlterDatabaseStmt:
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/blacktear23/go-proxyprotocol"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -718,6 +719,25 @@ func killConn(conn *clientConn) {
conn.mu.RLock()
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()

// If the connection being killed is a DDL Job,
// we need to CANCEL the matching jobID first.
if sessVars.StmtCtx.IsDDLJobInQueue {
jobID := sessVars.StmtCtx.DDLJobID
err := kv.RunInNewTxn(context.Background(), conn.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
// errs is the error per job, there is only one submitted
// err is the error of the overall task
errs, err := ddl.CancelJobs(txn, []int64{jobID})
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
return err
})
if err != nil {
logutil.BgLogger().Warn("could not cancel DDL job", zap.Error(err))
}
}

if cancelFunc != nil {
cancelFunc()
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type StatementContext struct {
// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
IsDDLJobInQueue bool
DDLJobID int64
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
Expand Down

0 comments on commit 41c0ab4

Please sign in to comment.