Skip to content

Commit

Permalink
jobs: retry failed jobs with exponential-backoff
Browse files Browse the repository at this point in the history
Failed jobs were being retried with a constant interval in the previous
implementation. This commit enables jobs to be retried with exponentially
increasing delays with an upper bound. This change enables to retry the jobs
that are not currently retried when they fail due to transient problems.

Release note: None

Fixes: cockroachdb#44594
  • Loading branch information
Sajjad Rizvi committed Jul 21, 2021
1 parent b4fad78 commit ea5f491
Show file tree
Hide file tree
Showing 33 changed files with 1,773 additions and 127 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-116 set the active cluster version in the format '<major>.<minor>'
version version 21.1-118 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-116</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-118</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ const (
// SQLStatsTable adds the system tables for storing persisted SQL statistics
// for statements and transactions.
SQLStatsTable
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{
Key: SQLStatsTable,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 116},
},
{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 118},
},

// Step (2): Add new versions here.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
Expand Down
122 changes: 102 additions & 20 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -42,16 +43,16 @@ const (
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)`
)

const claimQuery = `
claimQuery = `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE (claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `)
WHERE ((claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `))
ORDER BY created DESC
LIMIT $3
RETURNING id;`
)

// claimJobs places a claim with the given SessionID to job rows that are
// available.
Expand All @@ -64,26 +65,67 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, claimQuery,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
r.metrics.ClaimedJobs.Inc(int64(numRows))
if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 {
log.Infof(ctx, "claimed %d jobs", numRows)
}
return nil
})
}

const (
processQuery = `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)
`
// Select only those jobs that have their next retry time before the current time.
// 2^62 is the max positive number we can get.
// $5 is registry's current timestamp.
// $6 is the initial delay in exponential backoff calculation, which doubles in each run.
// $7 is the max retry delay.
nextRunClause = `
AND
$5 >= COALESCE(last_run, created) + least(
IF(
$6::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1) >= 0::INTERVAL,
$6::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1),
$7::INTERVAL
),
$7::INTERVAL
):::INTERVAL`
)

// getProcessQuery returns the query that selects the jobs that are claimed
// by this node.
func getProcessQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
// Select the running or reverting jobs that this node has claimed.
query := processQuery
args := []interface{}{StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID()}
// Gating the version that introduced job retries with exponential backoff.
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query += nextRunClause
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
return query, args
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
query, args := getProcessQuery(ctx, s, r)
// TODO(sajjad): To discuss: Shouldn't we close the iterator before returning
// from this function?
it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args...,
)
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
Expand All @@ -102,7 +144,6 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
}

r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
return nil
Expand Down Expand Up @@ -158,12 +199,30 @@ func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions(
// resumeJob resumes a claimed job.
func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session) error {
log.Infof(ctx, "job %d: resuming execution", jobID)
query := `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)%s
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`
args := []interface{}{jobID, s.ID().UnsafeBytes()}
runStats := ""
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
runStats = `
, $3 >= COALESCE(last_run, created) + least(
IF(
$4::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1) >= 0::INTERVAL,
$4::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1),
$5::INTERVAL
),
$5::INTERVAL
):::INTERVAL as can_run`
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
stmt := fmt.Sprintf(query, runStats)

row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
jobID, s.ID().UnsafeBytes(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, stmt, args...,
)
if err != nil {
return errors.Wrapf(err, "job %d: could not query job table row", jobID)
Expand All @@ -186,6 +245,13 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID())
}

if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// It's too soon to run the job.
if !(*row[4].(*tree.DBool)) {
return nil
}
}

payload, err := UnmarshalPayload(row[1])
if err != nil {
return err
Expand Down Expand Up @@ -216,11 +282,14 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
r.metrics.ResumedClaimedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
_ = r.runJob(resumeCtx, resumer, job, status, job.taskName())
}); err != nil {
//TODO(sajjad): To discuss: we removed the job from adoptedJobs already
// while returning from runJob() above line.
r.removeAdoptedJob(jobID)
return err
}
Expand Down Expand Up @@ -304,7 +373,7 @@ SET status =
WHEN status = $1 THEN $2
WHEN status = $3 THEN $4
ELSE status
END
END%s
WHERE (status IN ($1, $3)) AND ((claim_session_id = $5) AND (claim_instance_id = $6))
RETURNING id, status`

Expand All @@ -315,16 +384,24 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
resetRuns := ""
args := []interface{}{
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
}
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
resetRuns = ", num_runs = 0, last_run = $7"
args = append(args, r.clock.Now().GoTime())
}
cancelStmt := fmt.Sprintf(cancelQuery, resetRuns)
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
// registry).
rows, err := r.ex.QueryBufferedEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
cancelQuery,
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
cancelStmt, args...,
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
Expand All @@ -339,6 +416,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
//TODO (sajjad): To discuss: We should not unregister the job. Instead,
// just cancel job's context. If we unregister the job here, it may
// happen that this cancel loop unregisters the job while a concurrent
// adopt loop immediately registers the job. In this way, the job may be
// canceling as well as reverting, which may result in unexpected behavior.
r.unregister(id)
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
Expand Down
68 changes: 48 additions & 20 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,45 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const intervalBaseSettingKey = "jobs.registry.interval.base"
const adoptIntervalSettingKey = "jobs.registry.interval.adopt"
const cancelIntervalSettingKey = "jobs.registry.interval.cancel"
const gcIntervalSettingKey = "jobs.registry.interval.gc"
const retentionTimeSettingKey = "jobs.retention_time"
const cancelUpdateLimitKey = "jobs.cancel_update_limit"
const (
intervalBaseSettingKey = "jobs.registry.interval.base"
adoptIntervalSettingKey = "jobs.registry.interval.adopt"
cancelIntervalSettingKey = "jobs.registry.interval.cancel"
gcIntervalSettingKey = "jobs.registry.interval.gc"
retentionTimeSettingKey = "jobs.retention_time"
cancelUpdateLimitKey = "jobs.cancel_update_limit"
retryInitialDelaySettingKey = "jobs.registry.retry.initial_delay"
retryMaxDelaySettingKey = "jobs.registry.retry.max_delay"
)

var (
// defaultAdoptInterval is the default adopt interval.
defaultAdoptInterval = 30 * time.Second

// defaultAdoptInterval is the default adopt interval.
var defaultAdoptInterval = 30 * time.Second
// defaultCancelInterval is the default cancel interval.
defaultCancelInterval = 10 * time.Second

// defaultCancelInterval is the default cancel interval.
var defaultCancelInterval = 10 * time.Second
// defaultGcInterval is the default GC Interval.
defaultGcInterval = 1 * time.Hour

// defaultGcInterval is the default GC Interval.
var defaultGcInterval = 1 * time.Hour
// defaultIntervalBase is the default interval base.
defaultIntervalBase = 1.0

// defaultIntervalBase is the default interval base.
var defaultIntervalBase = 1.0
// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
defaultRetentionTime = 14 * 24 * time.Hour

// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
var defaultRetentionTime = 14 * 24 * time.Hour
// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
defaultCancellationsUpdateLimit int64 = 1000

// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
var defaultCancellationsUpdateLimit int64 = 1000
// defaultRetryInitialDelay is the initial delay in the calculation of exponentially
// increasing delays to retry failed jobs.
defaultRetryInitialDelay = 30 * time.Second

// defaultRetryMaxDelay is the maximum delay to retry a failed job.
defaultRetryMaxDelay = 24 * time.Hour
)

var (
intervalBaseSetting = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -93,6 +106,21 @@ var (
defaultCancellationsUpdateLimit,
settings.NonNegativeInt,
)

retryInitialDelaySetting = settings.RegisterDurationSetting(
retryInitialDelaySettingKey,
"the starting duration of exponential-backoff delay "+
"to retry a failed job. The delay doubles after each retry.",
defaultRetryInitialDelay,
settings.NonNegativeDuration,
)

retryMaxDelaySetting = settings.RegisterDurationSetting(
retryMaxDelaySettingKey,
"the maximum duration by which a failed job can be delayed to retry",
defaultRetryMaxDelay,
settings.PositiveDuration,
)
)

// jitter adds a small jitter in the given duration.
Expand Down
Loading

0 comments on commit ea5f491

Please sign in to comment.