Skip to content

Commit

Permalink
jobs: retry jobs with exponential-backoff
Browse files Browse the repository at this point in the history
Failed jobs were being retried with a constant interval in the previous
implementation. This commit enables jobs to be retried with exponentially
increasing delays with an upper bound. This change enables to retry the jobs
that are not currently retried when they fail due to transient problems.

Release note: None

Fixes: cockroachdb#44594
  • Loading branch information
Sajjad Rizvi committed Aug 2, 2021
1 parent 89bb4c7 commit 4d99a0b
Show file tree
Hide file tree
Showing 41 changed files with 1,798 additions and 147 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-120 set the active cluster version in the format '<major>.<minor>'
version version 21.1-122 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-120</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-122</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,10 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
},
}
}

// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
// Reduce retry delays.
sqlDBRestore.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '1ms'")
// Expect the restore to succeed.
sqlDBRestore.CheckQueryResultsRetry(t,
`SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`,
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ const (
DatabaseRoleSettings
// TenantUsageTable adds the system table for tracking tenant usage.
TenantUsageTable
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -427,6 +429,11 @@ var versionsSingleton = keyedVersions{
Key: TenantUsageTable,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 120},
},
{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 122},
},

// Step (2): Add new versions here.
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
Expand Down
133 changes: 111 additions & 22 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -43,16 +44,16 @@ const (
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)`
)

const claimQuery = `
claimQuery = `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE (claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `)
WHERE ((claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `))
ORDER BY created DESC
LIMIT $3
RETURNING id;`
)

func (r *Registry) maybeDumpTrace(
resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error,
Expand Down Expand Up @@ -96,26 +97,81 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, claimQuery,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
r.metrics.ClaimedJobs.Inc(int64(numRows))
if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 {
log.Infof(ctx, "claimed %d jobs", numRows)
}
return nil
})
}

const (
// processQueryStatusTupleString includes the states of a job in which a
// job can be claimed and resumed.
processQueryStatusTupleString = `(` +
`'` + string(StatusRunning) + `', ` +
`'` + string(StatusReverting) + `'` +
`)`

// canRunArgs are used in canRunClause, which specify whether a job can be
// run now or not.
canRunArgs = `(SELECT $3 AS ts, $4 AS initial_delay, $5 AS max_delay) args`
canRunClause = `
args.ts >= COALESCE(last_run, created) + least(
IF(
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0,
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT,
args.max_delay
),
args.max_delay
)::INTERVAL
`
// processQueryBase and processQueryWhereBase select IDs of the jobs that
// can be processed among the claimed jobs.
processQueryBase = `SELECT id FROM system.jobs`
processQueryWhereBase = ` status IN ` + processQueryStatusTupleString + ` AND (claim_session_id = $1 AND claim_instance_id = $2)`

processQueryWithoutBackoff = processQueryBase + " WHERE " + processQueryWhereBase
processQueryWithBackoff = processQueryBase + ", " + canRunArgs +
" WHERE " + processQueryWhereBase + " AND " + canRunClause

resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
resumeQueryWithoutBackoff = `SELECT ` + resumeQueryBaseCols + ` FROM system.jobs WHERE ` + resumeQueryWhereBase
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run` +
` FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
)

// getProcessQuery returns the query that selects the jobs that are claimed
// by this node.
func getProcessQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
// Select the running or reverting jobs that this node has claimed.
query := processQueryWithoutBackoff
args := []interface{}{s.ID().UnsafeBytes(), r.ID()}
// Gating the version that introduced job retries with exponential backoff.
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query = processQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
return query, args
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
query, args := getProcessQuery(ctx, s, r)

it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args...,
)
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
Expand All @@ -134,7 +190,6 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
}

r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
return nil
Expand Down Expand Up @@ -190,12 +245,18 @@ func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions(
// resumeJob resumes a claimed job.
func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session) error {
log.Infof(ctx, "job %d: resuming execution", jobID)
resumeQuery := resumeQueryWithoutBackoff
args := []interface{}{jobID, s.ID().UnsafeBytes()}
backoffIsActive := r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff)
if backoffIsActive {
resumeQuery = resumeQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
jobID, s.ID().UnsafeBytes(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, resumeQuery, args...,
)
if err != nil {
return errors.Wrapf(err, "job %d: could not query job table row", jobID)
Expand All @@ -218,6 +279,13 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID())
}

if backoffIsActive {
// It's too soon to run the job.
if !(*row[4].(*tree.DBool)) {
return nil
}
}

payload, err := UnmarshalPayload(row[1])
if err != nil {
return err
Expand Down Expand Up @@ -248,6 +316,7 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
r.metrics.ResumedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
Expand Down Expand Up @@ -334,16 +403,38 @@ func (r *Registry) runJob(
return err
}

const cancelQuery = `
const (
cancelQueryUpdate = `
UPDATE system.jobs
SET status =
CASE
WHEN status = $1 THEN $2
WHEN status = $3 THEN $4
ELSE status
END
WHERE (status IN ($1, $3)) AND ((claim_session_id = $5) AND (claim_instance_id = $6))
END`
cancelQueryWhere = `
WHERE (status IN ($1, $3))
AND ((claim_session_id = $5) AND (claim_instance_id = $6))
RETURNING id, status`
)

func getCancelQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
var query string
args := []interface{}{
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
}
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
query = cancelQueryUpdate + ", num_runs = 0, last_run = $7" + cancelQueryWhere
args = append(args, r.clock.Now().GoTime())
} else {
query = cancelQueryUpdate + cancelQueryWhere
}
return query, args
}

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -352,16 +443,14 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
stmt, args := getCancelQuery(ctx, s, r)
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
// registry).
rows, err := r.ex.QueryBufferedEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
cancelQuery,
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
stmt, args...,
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
Expand Down
69 changes: 49 additions & 20 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,45 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const intervalBaseSettingKey = "jobs.registry.interval.base"
const adoptIntervalSettingKey = "jobs.registry.interval.adopt"
const cancelIntervalSettingKey = "jobs.registry.interval.cancel"
const gcIntervalSettingKey = "jobs.registry.interval.gc"
const retentionTimeSettingKey = "jobs.retention_time"
const cancelUpdateLimitKey = "jobs.cancel_update_limit"
const (
intervalBaseSettingKey = "jobs.registry.interval.base"
adoptIntervalSettingKey = "jobs.registry.interval.adopt"
cancelIntervalSettingKey = "jobs.registry.interval.cancel"
gcIntervalSettingKey = "jobs.registry.interval.gc"
retentionTimeSettingKey = "jobs.retention_time"
cancelUpdateLimitKey = "jobs.cancel_update_limit"
retryInitialDelaySettingKey = "jobs.registry.retry.initial_delay"
retryMaxDelaySettingKey = "jobs.registry.retry.max_delay"
)

var (
// defaultAdoptInterval is the default adopt interval.
defaultAdoptInterval = 30 * time.Second

// defaultAdoptInterval is the default adopt interval.
var defaultAdoptInterval = 30 * time.Second
// defaultCancelInterval is the default cancel interval.
defaultCancelInterval = 10 * time.Second

// defaultCancelInterval is the default cancel interval.
var defaultCancelInterval = 10 * time.Second
// defaultGcInterval is the default GC Interval.
defaultGcInterval = 1 * time.Hour

// defaultGcInterval is the default GC Interval.
var defaultGcInterval = 1 * time.Hour
// defaultIntervalBase is the default interval base.
defaultIntervalBase = 1.0

// defaultIntervalBase is the default interval base.
var defaultIntervalBase = 1.0
// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
defaultRetentionTime = 14 * 24 * time.Hour

// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
var defaultRetentionTime = 14 * 24 * time.Hour
// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
defaultCancellationsUpdateLimit int64 = 1000

// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
var defaultCancellationsUpdateLimit int64 = 1000
// defaultRetryInitialDelay is the initial delay in the calculation of exponentially
// increasing delays to retry failed jobs.
defaultRetryInitialDelay = 30 * time.Second

// defaultRetryMaxDelay is the maximum delay to retry a failed job.
defaultRetryMaxDelay = 24 * time.Hour
)

var (
intervalBaseSetting = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -93,6 +106,22 @@ var (
defaultCancellationsUpdateLimit,
settings.NonNegativeInt,
)

retryInitialDelaySetting = settings.RegisterDurationSetting(
retryInitialDelaySettingKey,
"the starting duration of exponential-backoff delay"+
" to retry a job which encountered a retryable error or had its coordinator"+
" fail. The delay doubles after each retry.",
defaultRetryInitialDelay,
settings.NonNegativeDuration,
)

retryMaxDelaySetting = settings.RegisterDurationSetting(
retryMaxDelaySettingKey,
"the maximum duration by which a job can be delayed to retry",
defaultRetryMaxDelay,
settings.PositiveDuration,
)
)

// jitter adds a small jitter in the given duration.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (j *Job) Succeeded(ctx context.Context) error {
var (
AdoptQuery = claimQuery

CancelQuery = cancelQuery
CancelQuery = cancelQueryUpdate

GcQuery = expiredJobsQuery

Expand Down
Loading

0 comments on commit 4d99a0b

Please sign in to comment.