From d7e60c4d8d1492f7e01dad1bf3db56595bb8ea66 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 3 Dec 2024 20:22:04 +0800 Subject: [PATCH 1/2] ddl: handle context done after sending DDL jobs --- pkg/ddl/ddl.go | 1 + pkg/ddl/executor.go | 11 +++++++++-- pkg/ddl/job_submitter_test.go | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 35d7da809e2b1..2b57cde0828c2 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1005,6 +1005,7 @@ func (d *ddl) close() { startTime := time.Now() d.cancel() + failpoint.InjectCall("afterDDLCloseCancel") d.wg.Wait() // when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager, // when run with uni-store BreakCampaignLoop is same as Close. diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index e14caea413c91..d26c14b43bc48 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -6585,8 +6585,15 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re } }) - // worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err - result := <-jobW.ResultCh[0] + var result jobSubmitResult + select { + case <-e.ctx.Done(): + logutil.DDLLogger().Info("DoDDLJob will quit because context done") + return context.Canceled + case res := <-jobW.ResultCh[0]: + // worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err + result = res + } // job.ID must be allocated after previous channel receive returns nil. jobID, err := result.jobID, result.err defer e.delJobDoneCh(jobID) diff --git a/pkg/ddl/job_submitter_test.go b/pkg/ddl/job_submitter_test.go index e3ed815a8aaa7..da514314d1a6d 100644 --- a/pkg/ddl/job_submitter_test.go +++ b/pkg/ddl/job_submitter_test.go @@ -581,3 +581,17 @@ func TestGenGIDAndInsertJobsWithRetryOnErr(t *testing.T) { require.True(t, ok) require.Equal(t, newGID-1, jobs[0].TableID) } + +func TestSubmitJobAfterDDLIsClosed(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t, mockstore.WithStoreType(mockstore.EmbedUnistore)) + tk := testkit.NewTestKit(t, store) + + var ddlErr error + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDDLCloseCancel", func() { + ddlErr = tk.ExecToErr("create database test2;") + }) + err := dom.DDL().Stop() + require.NoError(t, err) + require.Error(t, ddlErr) + require.Equal(t, "context canceled", ddlErr.Error()) +} From 3abf282c1ec305756aabdede7a4a7041741e71c0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 4 Dec 2024 00:05:16 +0800 Subject: [PATCH 2/2] address comment --- pkg/ddl/executor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index d26c14b43bc48..d954ade9cbfba 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -6589,7 +6589,7 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re select { case <-e.ctx.Done(): logutil.DDLLogger().Info("DoDDLJob will quit because context done") - return context.Canceled + return e.ctx.Err() case res := <-jobW.ResultCh[0]: // worker should restart to continue handling tasks in limitJobCh, and send back through jobW.err result = res @@ -6663,7 +6663,7 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re ticker = updateTickerInterval(ticker, 10*e.lease, ddlAction, i) case <-e.ctx.Done(): logutil.DDLLogger().Info("DoDDLJob will quit because context done") - return context.Canceled + return e.ctx.Err() } // If the connection being killed, we need to CANCEL the DDL job.