diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 871dc74153d7..870fd146ce37 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3207,19 +3207,20 @@ func TestPauseReason(t *testing.T) { } } -// TestNonCancelableJobsRetry tests that a non-cancelable job is retried when -// failed with a non-retryable error. -func TestNonCancelableJobsRetry(t *testing.T) { +// TestJobsRetryWhileReverting tests that a reverting job is always retried. +func TestJobsRetryWhileReverting(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Create a non-cancelable job. + + // Create a job. // Fail the job in resume to cause the job to revert. // Fail the job in revert state using a non-retryable error. // Make sure that the jobs is retried and is again in the revert state. + rts := registryTestSuite{} rts.setUp(t) defer rts.tearDown() - // Make mockJob non-cancelable. + // Make mockJob non-cancelable, ensuring that non-cancelable jobs are also retried. rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool { return true }) @@ -3243,7 +3244,7 @@ func TestNonCancelableJobsRetry(t *testing.T) { rts.check(t, jobs.StatusReverting) // Fail the job in reverting state without a retryable error. - rts.failOrCancelCh <- errors.New("failing with non-retryable error") + rts.failOrCancelCh <- errors.New("failing with a non-retryable error") rts.mu.e.OnFailOrCancelExit = true // Job should be retried even though it is non-cancelable. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index f7be4cfc5cc1..9d7b97e43159 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1155,20 +1156,6 @@ func (r retryJobError) Error() string { return string(r) } -// Registry does not retry a job that fails due to a permanent error. -var errJobPermanentSentinel = errors.New("permanent job-error") - -// MarkAsPermanentJobError marks an error as a permanent job error, which indicates -// Registry to not retry the job when it fails due to this error. -func MarkAsPermanentJobError(err error) error { - return errors.Mark(err, errJobPermanentSentinel) -} - -// IsPermanentJobError checks whether the given error is a permanent error. -func IsPermanentJobError(err error) bool { - return errors.Is(err, errJobPermanentSentinel) -} - // stepThroughStateMachine implements the state machine of the job lifecycle. // The job is executed with the ctx, so ctx must only be canceled if the job // should also be canceled. resultsCh is passed to the resumable func and should @@ -1270,6 +1257,9 @@ func (r *Registry) stepThroughStateMachine( defer jm.CurrentlyRunning.Dec(1) err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx) }() + if r.knobs.ModifyErrorAfterOnFailOrCancel != nil { + err = r.knobs.ModifyErrorAfterOnFailOrCancel(job.ID(), err) + } if successOnFailOrCancel := err == nil; successOnFailOrCancel { jm.FailOrCancelCompleted.Inc(1) // If the job has failed with any error different than canceled we @@ -1290,11 +1280,13 @@ func (r *Registry) stepThroughStateMachine( jm.FailOrCancelRetryError.Inc(1) return errors.Errorf("job %d: %s: restarting in background", job.ID(), err) } - // A non-cancelable job is always retried while reverting unless the error is marked as permanent. - if job.Payload().Noncancelable && !IsPermanentJobError(err) { + if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + // All reverting jobs are retried. jm.FailOrCancelRetryError.Inc(1) - return errors.Wrapf(err, "job %d: job is non-cancelable, restarting in background", job.ID()) + return errors.Wrapf(err, "job %d: failed to revert, restarting in background", job.ID()) } + // TODO(sajjad): Remove rest of the code in this case after v21.2. All reverting jobs + // are retried after v21.2. jm.FailOrCancelFailed.Inc(1) if sErr := (*InvalidStatusError)(nil); errors.As(err, &sErr) { if sErr.status != StatusPauseRequested { @@ -1317,6 +1309,9 @@ func (r *Registry) stepThroughStateMachine( telemetry.Inc(TelemetryMetrics[jobType].Failed) return jobErr case StatusRevertFailed: + // TODO(sajjad): Remove StatusRevertFailed and related code in other places in v22.1. + // v21.2 modified all reverting jobs to retry instead of go to revert-failed. Therefore, + // revert-failed state is not reachable after 21.2. if jobErr == nil { return errors.AssertionFailedf("job %d: has StatusRevertFailed but no error was provided", job.ID()) diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index ea0a1bf9c974..f3b27fdc6f0c 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -66,6 +67,10 @@ type TestingKnobs struct { // DisableAdoptions disables job adoptions. DisableAdoptions bool + + // ModifyErrorAfterOnFailOrCancel captures the error returned from OnFailOorCancel + // and sets the error to the returned value of this function. + ModifyErrorAfterOnFailOrCancel func(jobspb.JobID, error) error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 6951316bd4c6..c5be07778f73 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -740,8 +740,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError( // than tables. For jobs intended to drop other types of descriptors, we do // nothing. if _, ok := desc.(catalog.TableDescriptor); !ok { - // Mark the error as permanent so that is not retried in reverting state. - return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted")) + return errors.Newf("schema change jobs on databases and schemas cannot be reverted") } // Check that we aren't queued behind another schema changer. @@ -2265,8 +2264,7 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfa // unset. We cannot revert such schema changes, so just exit early with an // error. if details.DescID == descpb.InvalidID { - // Mark the error as permanent so that is not retried in reverting state. - return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted")) + return errors.Newf("schema change jobs on databases and schemas cannot be reverted") } sc := SchemaChanger{ descID: details.DescID, diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index c325a02c86bc..46051f2a7ffc 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -2187,6 +2187,12 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) { StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{ DisableBackfillMigrations: true, }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error { + // Allow jobs to terminate. + return nil } server, sqlDB, kvDB := serverutils.StartServer(t, params) defer server.Stopper().Stop(context.Background()) @@ -6365,7 +6371,7 @@ SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNI require.Regexp(t, "violates unique constraint", jobError) } -// TestPermanentErrorDuringRollback tests that a permanent error while rolling +// TestPermanentErrorDuringRollback tests that an error while rolling // back a schema change causes the job to fail, and that the appropriate error // is displayed in the jobs table. func TestPermanentErrorDuringRollback(t *testing.T) { @@ -6373,8 +6379,21 @@ func TestPermanentErrorDuringRollback(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + var sqlDB *gosql.DB + validateError := func(jobID jobspb.JobID, err error) error { + var jobIDFromTable jobspb.JobID + row := sqlDB.QueryRow("SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") + assert.NoError(t, row.Scan(&jobIDFromTable)) + if jobIDFromTable == jobID { + assert.Error(t, err, "the job must fail with an error") + // Allow the job to terminate. + return nil + } + return err + } runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) { - s, sqlDB, _ := serverutils.StartServer(t, params) + var s serverutils.TestServerInterface + s, sqlDB, _ = serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) _, err := sqlDB.Exec(` @@ -6394,7 +6413,7 @@ CREATE UNIQUE INDEX i ON t.test(v); var jobErr string row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") require.NoError(t, row.Scan(&jobID, &jobErr)) - require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr) + require.Regexp(t, `violates unique constraint "i"`, jobErr) if gcJobRecord { _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID) @@ -6427,9 +6446,11 @@ CREATE UNIQUE INDEX i ON t.test(v); return errors.New("permanent error") }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = validateError // Don't GC the job record after the schema change, so we can test dropping // the table with a failed mutation job. runTest(t, params, false /* gcJobRecord */) @@ -6916,6 +6937,9 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { ctx := context.Background() t.Run("failed due to injected error", func(t *testing.T) { + const maxToRetry = 2 + var lastJobError error + retryCnt := int32(0) var s serverutils.TestServerInterface params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ @@ -6938,9 +6962,19 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { return nil }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, err error) error { + if retryCnt == maxToRetry { + // Let the job complete after a few retries. + lastJobError = err + return nil + } + retryCnt++ + return err + } var db *gosql.DB s, db, _ = serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -6951,15 +6985,13 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { sqlDB.Exec(t, tc.setupStmts) sqlDB.ExpectErr(t, "injected permanent error", tc.scStmt) result := sqlDB.QueryStr(t, - `SELECT status, error FROM crdb_internal.jobs WHERE description ~ $1`, + `SELECT status FROM crdb_internal.jobs WHERE description ~ $1`, tc.jobRegex) require.Len(t, result, 1) - status, jobError := result[0][0], result[0][1] - require.Equal(t, string(jobs.StatusRevertFailed), status) + require.Equal(t, string(jobs.StatusFailed), result[0][0]) require.Regexp(t, - "cannot be reverted, manual cleanup may be required: "+ - "schema change jobs on databases and schemas cannot be reverted", - jobError) + "schema change jobs on databases and schemas cannot be reverted", + lastJobError) }) } }) @@ -7015,7 +7047,7 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { return nil }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } var db *gosql.DB @@ -7093,6 +7125,11 @@ func TestDropColumnAfterMutations(t *testing.T) { // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error { + // Allow jobs to terminate. + return nil + } s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx)