Skip to content

Commit

Permalink
jobs: retry all reverting jobs when they fail
Browse files Browse the repository at this point in the history
Previously, only non-cancelable reverting jobs were retried
by default. This commit makes all reverting jobs to retry when
they fail. As a result, reverting jobs do not fail due to
transient errors.

Release justification: a bug fix and high benefit changes to
existing functionality

Release note: None

Fixes: cockroachdb#66685
  • Loading branch information
Sajjad Rizvi committed Aug 25, 2021
1 parent 9388792 commit 6fb6bd7
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 39 deletions.
13 changes: 7 additions & 6 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3207,19 +3207,20 @@ func TestPauseReason(t *testing.T) {
}
}

// TestNonCancelableJobsRetry tests that a non-cancelable job is retried when
// failed with a non-retryable error.
func TestNonCancelableJobsRetry(t *testing.T) {
// TestJobsRetryWhileReverting tests that a reverting job is always retried.
func TestJobsRetryWhileReverting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Create a non-cancelable job.

// Create a job.
// Fail the job in resume to cause the job to revert.
// Fail the job in revert state using a non-retryable error.
// Make sure that the jobs is retried and is again in the revert state.

rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
// Make mockJob non-cancelable.
// Make mockJob non-cancelable, ensuring that non-cancelable jobs are also retried.
rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool {
return true
})
Expand All @@ -3243,7 +3244,7 @@ func TestNonCancelableJobsRetry(t *testing.T) {
rts.check(t, jobs.StatusReverting)

// Fail the job in reverting state without a retryable error.
rts.failOrCancelCh <- errors.New("failing with non-retryable error")
rts.failOrCancelCh <- errors.New("failing with a non-retryable error")
rts.mu.e.OnFailOrCancelExit = true

// Job should be retried even though it is non-cancelable.
Expand Down
29 changes: 12 additions & 17 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1155,20 +1156,6 @@ func (r retryJobError) Error() string {
return string(r)
}

// Registry does not retry a job that fails due to a permanent error.
var errJobPermanentSentinel = errors.New("permanent job-error")

// MarkAsPermanentJobError marks an error as a permanent job error, which indicates
// Registry to not retry the job when it fails due to this error.
func MarkAsPermanentJobError(err error) error {
return errors.Mark(err, errJobPermanentSentinel)
}

// IsPermanentJobError checks whether the given error is a permanent error.
func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// stepThroughStateMachine implements the state machine of the job lifecycle.
// The job is executed with the ctx, so ctx must only be canceled if the job
// should also be canceled. resultsCh is passed to the resumable func and should
Expand Down Expand Up @@ -1270,6 +1257,9 @@ func (r *Registry) stepThroughStateMachine(
defer jm.CurrentlyRunning.Dec(1)
err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx)
}()
if r.knobs.ModifyErrorAfterOnFailOrCancel != nil {
err = r.knobs.ModifyErrorAfterOnFailOrCancel(job.ID(), err)
}
if successOnFailOrCancel := err == nil; successOnFailOrCancel {
jm.FailOrCancelCompleted.Inc(1)
// If the job has failed with any error different than canceled we
Expand All @@ -1290,11 +1280,13 @@ func (r *Registry) stepThroughStateMachine(
jm.FailOrCancelRetryError.Inc(1)
return errors.Errorf("job %d: %s: restarting in background", job.ID(), err)
}
// A non-cancelable job is always retried while reverting unless the error is marked as permanent.
if job.Payload().Noncancelable && !IsPermanentJobError(err) {
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// All reverting jobs are retried.
jm.FailOrCancelRetryError.Inc(1)
return errors.Wrapf(err, "job %d: job is non-cancelable, restarting in background", job.ID())
return errors.Wrapf(err, "job %d: failed to revert, restarting in background", job.ID())
}
// TODO(sajjad): Remove rest of the code in this case after v21.2. All reverting jobs
// are retried after v21.2.
jm.FailOrCancelFailed.Inc(1)
if sErr := (*InvalidStatusError)(nil); errors.As(err, &sErr) {
if sErr.status != StatusPauseRequested {
Expand All @@ -1317,6 +1309,9 @@ func (r *Registry) stepThroughStateMachine(
telemetry.Inc(TelemetryMetrics[jobType].Failed)
return jobErr
case StatusRevertFailed:
// TODO(sajjad): Remove StatusRevertFailed and related code in other places in v22.1.
// v21.2 modified all reverting jobs to retry instead of go to revert-failed. Therefore,
// revert-failed state is not reachable after 21.2.
if jobErr == nil {
return errors.AssertionFailedf("job %d: has StatusRevertFailed but no error was provided",
job.ID())
Expand Down
5 changes: 5 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -66,6 +67,10 @@ type TestingKnobs struct {

// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// ModifyErrorAfterOnFailOrCancel captures the error returned from OnFailOorCancel
// and sets the error to the returned value of this function.
ModifyErrorAfterOnFailOrCancel func(jobspb.JobID, error) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError(
// than tables. For jobs intended to drop other types of descriptors, we do
// nothing.
if _, ok := desc.(catalog.TableDescriptor); !ok {
// Mark the error as permanent so that is not retried in reverting state.
return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted"))
return errors.Newf("schema change jobs on databases and schemas cannot be reverted")
}

// Check that we aren't queued behind another schema changer.
Expand Down Expand Up @@ -2265,8 +2264,7 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfa
// unset. We cannot revert such schema changes, so just exit early with an
// error.
if details.DescID == descpb.InvalidID {
// Mark the error as permanent so that is not retried in reverting state.
return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted"))
return errors.Newf("schema change jobs on databases and schemas cannot be reverted")
}
sc := SchemaChanger{
descID: details.DescID,
Expand Down
61 changes: 49 additions & 12 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,12 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) {
StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{
DisableBackfillMigrations: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error {
// Allow jobs to terminate.
return nil
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())
Expand Down Expand Up @@ -6365,16 +6371,29 @@ SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNI
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// TestPermanentErrorDuringRollback tests that an error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
func TestPermanentErrorDuringRollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var sqlDB *gosql.DB
validateError := func(jobID jobspb.JobID, err error) error {
var jobIDFromTable jobspb.JobID
row := sqlDB.QueryRow("SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
assert.NoError(t, row.Scan(&jobIDFromTable))
if jobIDFromTable == jobID {
assert.Error(t, err, "the job must fail with an error")
// Allow the job to terminate.
return nil
}
return err
}
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
var s serverutils.TestServerInterface
s, sqlDB, _ = serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
Expand All @@ -6394,7 +6413,7 @@ CREATE UNIQUE INDEX i ON t.test(v);
var jobErr string
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)
require.Regexp(t, `violates unique constraint "i"`, jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
Expand Down Expand Up @@ -6427,9 +6446,11 @@ CREATE UNIQUE INDEX i ON t.test(v);
return errors.New("permanent error")
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = validateError
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
Expand Down Expand Up @@ -6916,6 +6937,9 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
ctx := context.Background()

t.Run("failed due to injected error", func(t *testing.T) {
const maxToRetry = 2
var lastJobError error
retryCnt := int32(0)
var s serverutils.TestServerInterface
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
Expand All @@ -6938,9 +6962,19 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
return nil
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, err error) error {
if retryCnt == maxToRetry {
// Let the job complete after a few retries.
lastJobError = err
return nil
}
retryCnt++
return err
}
var db *gosql.DB
s, db, _ = serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
Expand All @@ -6951,15 +6985,13 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
sqlDB.Exec(t, tc.setupStmts)
sqlDB.ExpectErr(t, "injected permanent error", tc.scStmt)
result := sqlDB.QueryStr(t,
`SELECT status, error FROM crdb_internal.jobs WHERE description ~ $1`,
`SELECT status FROM crdb_internal.jobs WHERE description ~ $1`,
tc.jobRegex)
require.Len(t, result, 1)
status, jobError := result[0][0], result[0][1]
require.Equal(t, string(jobs.StatusRevertFailed), status)
require.Equal(t, string(jobs.StatusFailed), result[0][0])
require.Regexp(t,
"cannot be reverted, manual cleanup may be required: "+
"schema change jobs on databases and schemas cannot be reverted",
jobError)
"schema change jobs on databases and schemas cannot be reverted",
lastJobError)
})
}
})
Expand Down Expand Up @@ -7015,7 +7047,7 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
return nil
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
var db *gosql.DB
Expand Down Expand Up @@ -7093,6 +7125,11 @@ func TestDropColumnAfterMutations(t *testing.T) {
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error {
// Allow jobs to terminate.
return nil
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
Expand Down

0 comments on commit 6fb6bd7

Please sign in to comment.