diff --git a/ddl/ddl.go b/ddl/ddl.go index ad41c3b53abac..7dac714e2473c 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -527,6 +527,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { d.limitJobCh <- task // worker should restart to continue handling tasks in limitJobCh, and send back through task.err err := <-task.err + if err != nil { + // The transaction of enqueuing job is failed. + return errors.Trace(err) + } ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 326c69cccbfd0..72df6ac1d2788 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -285,6 +285,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { return errors.Trace(err) } } + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) return nil }) var jobs string @@ -294,7 +299,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) } - logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + if err != nil { + logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) + } else { + logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + } } // getHistoryDDLJob gets a DDL job with job's ID from history queue. diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 6e745820b04b9..830254ea76302 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -476,6 +476,32 @@ func (s *testDDLSuite) TestColumnError(c *C) { doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionDropColumns, []interface{}{[]model.CIStr{model.NewCIStr("c5"), model.NewCIStr("c6")}, make([]bool, 2)}, ctx, d) } +func (s *testDDLSerialSuite) TestAddBatchJobError(c *C) { + store := testCreateStore(c, "test_add_batch_job_error") + defer func() { + err := store.Close() + c.Assert(err, IsNil) + }() + d, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + c.Assert(err, IsNil) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + ctx := testNewContext(d) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`), IsNil) + // Test the job runner should not hang forever. + job := &model.Job{SchemaID: 1, TableID: 1} + err = d.doDDLJob(ctx, job) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "mockAddBatchDDLJobsErr") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr"), IsNil) +} + func testCheckOwner(c *C, d *ddl, expectedVal bool) { c.Assert(d.isOwner(), Equals, expectedVal) }