From 4d99a0b9c2f560509d20a3a1b8efa4cb23c05596 Mon Sep 17 00:00:00 2001 From: Sajjad Rizvi Date: Thu, 24 Jun 2021 16:20:16 -0400 Subject: [PATCH] jobs: retry jobs with exponential-backoff Failed jobs were being retried with a constant interval in the previous implementation. This commit enables jobs to be retried with exponentially increasing delays with an upper bound. This change enables to retry the jobs that are not currently retried when they fail due to transient problems. Release note: None Fixes: #44594 --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- .../full_cluster_backup_restore_test.go | 3 +- pkg/clusterversion/cockroach_versions.go | 7 + pkg/clusterversion/key_string.go | 5 +- pkg/jobs/BUILD.bazel | 1 + pkg/jobs/adopt.go | 133 +++- pkg/jobs/config.go | 69 +- pkg/jobs/helpers_test.go | 2 +- pkg/jobs/jobs.go | 63 +- pkg/jobs/jobspb/jobs.pb.go | 2 +- pkg/jobs/jobspb/jobs.proto | 2 +- pkg/jobs/lease_test.go | 21 +- pkg/jobs/metrics.go | 40 + pkg/jobs/registry.go | 16 +- pkg/jobs/registry_external_test.go | 2 +- pkg/jobs/registry_test.go | 345 +++++++++ pkg/jobs/testing_knobs.go | 13 +- pkg/jobs/update.go | 68 +- pkg/migration/BUILD.bazel | 3 + pkg/migration/migrationjob/migration_job.go | 2 +- pkg/migration/migrationmanager/BUILD.bazel | 7 +- pkg/migration/migrationmanager/manager.go | 6 +- .../migrationmanager/manager_external_test.go | 9 +- pkg/migration/migrations/BUILD.bazel | 21 + pkg/migration/migrations/helpers_test.go | 24 + pkg/migration/migrations/migrations.go | 4 + .../retry_jobs_with_exponential_backoff.go | 271 +++++++ ..._with_exponential_backoff_external_test.go | 683 ++++++++++++++++++ pkg/migration/tenant_migration.go | 1 + .../{migrationmanager => }/testing_knobs.go | 23 +- pkg/server/server_sql.go | 3 +- pkg/sql/catalog/systemschema/system.go | 42 +- pkg/sql/exec_util.go | 1 + pkg/sql/lex/BUILD.bazel | 1 - pkg/sql/logictest/BUILD.bazel | 2 +- pkg/sql/logictest/logic.go | 6 +- .../testdata/logic_test/information_schema | 2 + .../logictest/testdata/logic_test/pg_catalog | 3 - pkg/sql/logictest/testdata/logic_test/system | 20 +- pkg/ts/catalog/chart_catalog.go | 15 +- 41 files changed, 1798 insertions(+), 147 deletions(-) create mode 100644 pkg/migration/migrations/helpers_test.go create mode 100644 pkg/migration/migrations/retry_jobs_with_exponential_backoff.go create mode 100644 pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go rename pkg/migration/{migrationmanager => }/testing_knobs.go (50%) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index fa94521e72e2..6b8ee4dea6f9 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -144,4 +144,4 @@ trace.datadog.project string CockroachDB the project under which traces will be trace.debug.enable boolean false if set, traces for recent requests can be seen at https:///debug/requests trace.lightstep.token string if set, traces go to Lightstep using this token trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time. -version version 21.1-120 set the active cluster version in the format '.' +version version 21.1-122 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index dcb646366245..b70b9dd3079e 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -148,6 +148,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen at https:///debug/requests trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time. -versionversion21.1-120set the active cluster version in the format '.' +versionversion21.1-122set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 0d5b8f071aef..beb7486428d1 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -665,9 +665,10 @@ func TestClusterRestoreFailCleanup(t *testing.T) { }, } } - // The initial restore will return an error, and restart. sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo) + // Reduce retry delays. + sqlDBRestore.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '1ms'") // Expect the restore to succeed. sqlDBRestore.CheckQueryResultsRetry(t, `SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`, diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index f63b46c1231b..df0d6a2cb5b6 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -266,6 +266,8 @@ const ( DatabaseRoleSettings // TenantUsageTable adds the system table for tracking tenant usage. TenantUsageTable + // RetryJobsWithExponentialBackoff retries failed jobs with exponential delays. + RetryJobsWithExponentialBackoff // Step (1): Add new versions here. ) @@ -427,6 +429,11 @@ var versionsSingleton = keyedVersions{ Key: TenantUsageTable, Version: roachpb.Version{Major: 21, Minor: 1, Internal: 120}, }, + { + Key: RetryJobsWithExponentialBackoff, + Version: roachpb.Version{Major: 21, Minor: 1, Internal: 122}, + }, + // Step (2): Add new versions here. } diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 42a0a34bf6d6..c9488e045b31 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -39,11 +39,12 @@ func _() { _ = x[SQLStatsTable-28] _ = x[DatabaseRoleSettings-29] _ = x[TenantUsageTable-30] + _ = x[RetryJobsWithExponentialBackoff-31] } -const _Key_name = "Start20_2NodeMembershipStatusMinPasswordLengthAbortSpanBytesCreateLoginPrivilegeHBAForNonTLSV20_2Start21_1CPutInlineReplicaVersionsreplacedTruncatedAndRangeAppliedStateMigrationreplacedPostTruncatedAndRangeAppliedStateMigrationTruncatedAndRangeAppliedStateMigrationPostTruncatedAndRangeAppliedStateMigrationSeparatedIntentsTracingVerbosityIndependentSemanticsClosedTimestampsRaftTransportPriorReadSummariesNonVotingReplicasV21_1Start21_1PLUSStart21_2JoinTokensTableAcquisitionTypeInLeaseHistorySerializeViewUDTsExpressionIndexesDeleteDeprecatedNamespaceTableDescriptorMigrationFixDescriptorsSQLStatsTableDatabaseRoleSettingsTenantUsageTable" +const _Key_name = "Start20_2NodeMembershipStatusMinPasswordLengthAbortSpanBytesCreateLoginPrivilegeHBAForNonTLSV20_2Start21_1CPutInlineReplicaVersionsreplacedTruncatedAndRangeAppliedStateMigrationreplacedPostTruncatedAndRangeAppliedStateMigrationTruncatedAndRangeAppliedStateMigrationPostTruncatedAndRangeAppliedStateMigrationSeparatedIntentsTracingVerbosityIndependentSemanticsClosedTimestampsRaftTransportPriorReadSummariesNonVotingReplicasV21_1Start21_1PLUSStart21_2JoinTokensTableAcquisitionTypeInLeaseHistorySerializeViewUDTsExpressionIndexesDeleteDeprecatedNamespaceTableDescriptorMigrationFixDescriptorsSQLStatsTableDatabaseRoleSettingsTenantUsageTableRetryJobsWithExponentialBackoff" -var _Key_index = [...]uint16{0, 9, 29, 46, 60, 80, 92, 97, 106, 116, 131, 177, 227, 265, 307, 323, 359, 388, 406, 423, 428, 441, 450, 465, 494, 511, 528, 577, 591, 604, 624, 640} +var _Key_index = [...]uint16{0, 9, 29, 46, 60, 80, 92, 97, 106, 116, 131, 177, 227, 265, 307, 323, 359, 388, 406, 423, 428, 441, 450, 465, 494, 511, 528, 577, 591, 604, 624, 640, 671} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index b811772b1da0..712a43478270 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -24,6 +24,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/kv", "//pkg/roachpb:with-mocks", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 6decbd6bb469..afe3c2165843 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -16,6 +16,7 @@ import ( "strconv" "sync" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -43,16 +44,16 @@ const ( // NonTerminalStatusTupleString is a sql tuple corresponding to statuses of // non-terminal jobs. NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)` -) -const claimQuery = ` + claimQuery = ` UPDATE system.jobs SET claim_session_id = $1, claim_instance_id = $2 - WHERE (claim_session_id IS NULL) - AND (status IN ` + claimableStatusTupleString + `) + WHERE ((claim_session_id IS NULL) + AND (status IN ` + claimableStatusTupleString + `)) ORDER BY created DESC LIMIT $3 RETURNING id;` +) func (r *Registry) maybeDumpTrace( resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error, @@ -96,11 +97,11 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { } numRows, err := r.ex.Exec( ctx, "claim-jobs", txn, claimQuery, - s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop, - ) + s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop) if err != nil { return errors.Wrap(err, "could not query jobs table") } + r.metrics.ClaimedJobs.Inc(int64(numRows)) if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 { log.Infof(ctx, "claimed %d jobs", numRows) } @@ -108,14 +109,69 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { }) } +const ( + // processQueryStatusTupleString includes the states of a job in which a + // job can be claimed and resumed. + processQueryStatusTupleString = `(` + + `'` + string(StatusRunning) + `', ` + + `'` + string(StatusReverting) + `'` + + `)` + + // canRunArgs are used in canRunClause, which specify whether a job can be + // run now or not. + canRunArgs = `(SELECT $3 AS ts, $4 AS initial_delay, $5 AS max_delay) args` + canRunClause = ` +args.ts >= COALESCE(last_run, created) + least( + IF( + args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0, + args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT, + args.max_delay + ), + args.max_delay +)::INTERVAL +` + // processQueryBase and processQueryWhereBase select IDs of the jobs that + // can be processed among the claimed jobs. + processQueryBase = `SELECT id FROM system.jobs` + processQueryWhereBase = ` status IN ` + processQueryStatusTupleString + ` AND (claim_session_id = $1 AND claim_instance_id = $2)` + + processQueryWithoutBackoff = processQueryBase + " WHERE " + processQueryWhereBase + processQueryWithBackoff = processQueryBase + ", " + canRunArgs + + " WHERE " + processQueryWhereBase + " AND " + canRunClause + + resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)" + resumeQueryWhereBase = `id = $1 AND claim_session_id = $2` + resumeQueryWithoutBackoff = `SELECT ` + resumeQueryBaseCols + ` FROM system.jobs WHERE ` + resumeQueryWhereBase + resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run` + + ` FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase +) + +// getProcessQuery returns the query that selects the jobs that are claimed +// by this node. +func getProcessQuery( + ctx context.Context, s sqlliveness.Session, r *Registry, +) (string, []interface{}) { + // Select the running or reverting jobs that this node has claimed. + query := processQueryWithoutBackoff + args := []interface{}{s.ID().UnsafeBytes(), r.ID()} + // Gating the version that introduced job retries with exponential backoff. + if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + // Select only those jobs that can be executed right now. + query = processQueryWithBackoff + initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds() + maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds() + args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay) + } + return query, args +} + // processClaimedJobs processes all jobs currently claimed by the registry. func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error { + query, args := getProcessQuery(ctx, s, r) + it, err := r.ex.QueryIteratorEx( ctx, "select-running/get-claimed-jobs", nil, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, ` -SELECT id FROM system.jobs -WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`, - StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(), + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args..., ) if err != nil { return errors.Wrapf(err, "could not query for claimed jobs") @@ -134,7 +190,6 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance if err != nil { return errors.Wrapf(err, "could not query for claimed jobs") } - r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume) r.resumeClaimedJobs(ctx, s, claimedToResume) return nil @@ -190,12 +245,18 @@ func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions( // resumeJob resumes a claimed job. func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session) error { log.Infof(ctx, "job %d: resuming execution", jobID) + resumeQuery := resumeQueryWithoutBackoff + args := []interface{}{jobID, s.ID().UnsafeBytes()} + backoffIsActive := r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) + if backoffIsActive { + resumeQuery = resumeQueryWithBackoff + initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds() + maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds() + args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay) + } row, err := r.ex.QueryRowEx( ctx, "get-job-row", nil, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, ` -SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id) -FROM system.jobs WHERE id = $1 AND claim_session_id = $2`, - jobID, s.ID().UnsafeBytes(), + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, resumeQuery, args..., ) if err != nil { return errors.Wrapf(err, "job %d: could not query job table row", jobID) @@ -218,6 +279,13 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`, return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID()) } + if backoffIsActive { + // It's too soon to run the job. + if !(*row[4].(*tree.DBool)) { + return nil + } + } + payload, err := UnmarshalPayload(row[1]) if err != nil { return err @@ -248,6 +316,7 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`, aj := &adoptedJob{sid: s.ID(), cancel: cancel} r.addAdoptedJob(jobID, aj) + r.metrics.ResumedJobs.Inc(1) if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) { // Wait for the job to finish. No need to print the error because if there // was one it's been set in the job status already. @@ -334,16 +403,38 @@ func (r *Registry) runJob( return err } -const cancelQuery = ` +const ( + cancelQueryUpdate = ` UPDATE system.jobs SET status = CASE WHEN status = $1 THEN $2 WHEN status = $3 THEN $4 ELSE status - END -WHERE (status IN ($1, $3)) AND ((claim_session_id = $5) AND (claim_instance_id = $6)) + END` + cancelQueryWhere = ` + WHERE (status IN ($1, $3)) + AND ((claim_session_id = $5) AND (claim_instance_id = $6)) RETURNING id, status` +) + +func getCancelQuery( + ctx context.Context, s sqlliveness.Session, r *Registry, +) (string, []interface{}) { + var query string + args := []interface{}{ + StatusPauseRequested, StatusPaused, + StatusCancelRequested, StatusReverting, + s.ID().UnsafeBytes(), r.ID(), + } + if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + query = cancelQueryUpdate + ", num_runs = 0, last_run = $7" + cancelQueryWhere + args = append(args, r.clock.Now().GoTime()) + } else { + query = cancelQueryUpdate + cancelQueryWhere + } + return query, args +} func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error { return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -352,16 +443,14 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil { return errors.WithAssertionFailure(err) } + stmt, args := getCancelQuery(ctx, s, r) // Note that we have to buffer all rows first - before processing each // job - because we have to make sure that the query executes without an // error (otherwise, the system.jobs table might diverge from the jobs // registry). rows, err := r.ex.QueryBufferedEx( ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - cancelQuery, - StatusPauseRequested, StatusPaused, - StatusCancelRequested, StatusReverting, - s.ID().UnsafeBytes(), r.ID(), + stmt, args..., ) if err != nil { return errors.Wrap(err, "could not query jobs table") diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index 1d759ae96cdc..f618ec2297af 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -20,32 +20,45 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -const intervalBaseSettingKey = "jobs.registry.interval.base" -const adoptIntervalSettingKey = "jobs.registry.interval.adopt" -const cancelIntervalSettingKey = "jobs.registry.interval.cancel" -const gcIntervalSettingKey = "jobs.registry.interval.gc" -const retentionTimeSettingKey = "jobs.retention_time" -const cancelUpdateLimitKey = "jobs.cancel_update_limit" +const ( + intervalBaseSettingKey = "jobs.registry.interval.base" + adoptIntervalSettingKey = "jobs.registry.interval.adopt" + cancelIntervalSettingKey = "jobs.registry.interval.cancel" + gcIntervalSettingKey = "jobs.registry.interval.gc" + retentionTimeSettingKey = "jobs.retention_time" + cancelUpdateLimitKey = "jobs.cancel_update_limit" + retryInitialDelaySettingKey = "jobs.registry.retry.initial_delay" + retryMaxDelaySettingKey = "jobs.registry.retry.max_delay" +) + +var ( + // defaultAdoptInterval is the default adopt interval. + defaultAdoptInterval = 30 * time.Second -// defaultAdoptInterval is the default adopt interval. -var defaultAdoptInterval = 30 * time.Second + // defaultCancelInterval is the default cancel interval. + defaultCancelInterval = 10 * time.Second -// defaultCancelInterval is the default cancel interval. -var defaultCancelInterval = 10 * time.Second + // defaultGcInterval is the default GC Interval. + defaultGcInterval = 1 * time.Hour -// defaultGcInterval is the default GC Interval. -var defaultGcInterval = 1 * time.Hour + // defaultIntervalBase is the default interval base. + defaultIntervalBase = 1.0 -// defaultIntervalBase is the default interval base. -var defaultIntervalBase = 1.0 + // defaultRetentionTime is the default duration for which terminal jobs are + // kept in the records. + defaultRetentionTime = 14 * 24 * time.Hour -// defaultRetentionTime is the default duration for which terminal jobs are -// kept in the records. -var defaultRetentionTime = 14 * 24 * time.Hour + // defaultCancellationsUpdateLimit is the default number of jobs that can be + // updated when canceling jobs concurrently from dead sessions. + defaultCancellationsUpdateLimit int64 = 1000 -// defaultCancellationsUpdateLimit is the default number of jobs that can be -// updated when canceling jobs concurrently from dead sessions. -var defaultCancellationsUpdateLimit int64 = 1000 + // defaultRetryInitialDelay is the initial delay in the calculation of exponentially + // increasing delays to retry failed jobs. + defaultRetryInitialDelay = 30 * time.Second + + // defaultRetryMaxDelay is the maximum delay to retry a failed job. + defaultRetryMaxDelay = 24 * time.Hour +) var ( intervalBaseSetting = settings.RegisterFloatSetting( @@ -93,6 +106,22 @@ var ( defaultCancellationsUpdateLimit, settings.NonNegativeInt, ) + + retryInitialDelaySetting = settings.RegisterDurationSetting( + retryInitialDelaySettingKey, + "the starting duration of exponential-backoff delay"+ + " to retry a job which encountered a retryable error or had its coordinator"+ + " fail. The delay doubles after each retry.", + defaultRetryInitialDelay, + settings.NonNegativeDuration, + ) + + retryMaxDelaySetting = settings.RegisterDurationSetting( + retryMaxDelaySettingKey, + "the maximum duration by which a job can be delayed to retry", + defaultRetryMaxDelay, + settings.PositiveDuration, + ) ) // jitter adds a small jitter in the given duration. diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index c5baa5048ecd..f54548324c06 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -92,7 +92,7 @@ func (j *Job) Succeeded(ctx context.Context) error { var ( AdoptQuery = claimQuery - CancelQuery = cancelQuery + CancelQuery = cancelQueryUpdate GcQuery = expiredJobsQuery diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 95548579fd05..4e53b4631569 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -16,6 +16,7 @@ import ( "reflect" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" @@ -262,11 +263,16 @@ func (j *Job) started(ctx context.Context, txn *kv.Txn) error { if md.Status != StatusPending && md.Status != StatusRunning { return errors.Errorf("job with status %s cannot be marked started", md.Status) } - // TODO(spaskob): Remove this status change after we stop supporting - // pending job states. - ju.UpdateStatus(StatusRunning) - md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime()) - ju.UpdatePayload(md.Payload) + if md.Payload.StartedMicros == 0 { + // TODO(spaskob): Remove this status change after we stop supporting + // pending job states. + ju.UpdateStatus(StatusRunning) + md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime()) + ju.UpdatePayload(md.Payload) + } + if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + ju.UpdateRunStats(md.RunStats.NumRuns+1, j.registry.clock.Now().GoTime()) + } return nil }) } @@ -482,7 +488,8 @@ func (j *Job) cancelRequested( } if md.Status == StatusPaused && md.Payload.FinalResumeError != nil { decodedErr := errors.DecodeError(ctx, *md.Payload.FinalResumeError) - return fmt.Errorf("job %d is paused and has non-nil FinalResumeError %s hence cannot be canceled and should be reverted", j.ID(), decodedErr.Error()) + return fmt.Errorf("job %d is paused and has non-nil FinalResumeError "+ + "%s hence cannot be canceled and should be reverted", j.ID(), decodedErr.Error()) } if fn != nil { if err := fn(ctx, txn); err != nil { @@ -547,29 +554,39 @@ func (j *Job) reverted( ctx context.Context, txn *kv.Txn, err error, fn func(context.Context, *kv.Txn) error, ) error { return j.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error { - if md.Status == StatusReverting { - return nil - } - if md.Status != StatusCancelRequested && md.Status != StatusRunning && md.Status != StatusPending { + if md.Status != StatusReverting && + md.Status != StatusCancelRequested && + md.Status != StatusRunning && + md.Status != StatusPending { return fmt.Errorf("job with status %s cannot be reverted", md.Status) } - if fn != nil { - if err := fn(ctx, txn); err != nil { - return err + if md.Status != StatusReverting { + if fn != nil { + if err := fn(ctx, txn); err != nil { + return err + } + } + if err != nil { + md.Payload.Error = err.Error() + encodedErr := errors.EncodeError(ctx, err) + md.Payload.FinalResumeError = &encodedErr + ju.UpdatePayload(md.Payload) + } else { + if md.Payload.FinalResumeError == nil { + return errors.AssertionFailedf( + "tried to mark job as reverting, but no error was provided or recorded") + } } + ju.UpdateStatus(StatusReverting) } - if err != nil { - md.Payload.Error = err.Error() - encodedErr := errors.EncodeError(ctx, err) - md.Payload.FinalResumeError = &encodedErr - ju.UpdatePayload(md.Payload) - } else { - if md.Payload.FinalResumeError == nil { - return errors.AssertionFailedf( - "tried to mark job as reverting, but no error was provided or recorded") + if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + numRuns := md.RunStats.NumRuns + 1 + if md.Status != StatusReverting { + // Reset the number of runs to speed up reverting. + numRuns = 1 } + ju.UpdateRunStats(numRuns, j.registry.clock.Now().GoTime()) } - ju.UpdateStatus(StatusReverting) return nil }) } diff --git a/pkg/jobs/jobspb/jobs.pb.go b/pkg/jobs/jobspb/jobs.pb.go index a4d488b4d84e..076591bea886 100644 --- a/pkg/jobs/jobspb/jobs.pb.go +++ b/pkg/jobs/jobspb/jobs.pb.go @@ -2561,7 +2561,7 @@ func (*Progress) XXX_OneofWrappers() []interface{} { type Job struct { Id JobID `protobuf:"varint,1,opt,name=id,proto3,customtype=JobID" json:"id"` - // Keep progress first as it may bre more relevant to see when looking at a + // Keep progress first as it may be more relevant to see when looking at a // running job. Progress *Progress `protobuf:"bytes,2,opt,name=progress,proto3" json:"progress,omitempty"` Payload *Payload `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 341a1b548c1c..a6cf624a26d3 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -760,7 +760,7 @@ enum Type { message Job { int64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "JobID"]; - // Keep progress first as it may bre more relevant to see when looking at a + // Keep progress first as it may be more relevant to see when looking at a // running job. Progress progress = 2; Payload payload = 3; diff --git a/pkg/jobs/lease_test.go b/pkg/jobs/lease_test.go index e82cd5f3259f..ba9c2d027fcf 100644 --- a/pkg/jobs/lease_test.go +++ b/pkg/jobs/lease_test.go @@ -14,11 +14,13 @@ import ( "context" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -32,20 +34,25 @@ func TestJobsTableClaimFamily(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) var table, schema string sqlDB.QueryRow(t, `SHOW CREATE system.jobs`).Scan(&table, &schema) - if !strings.Contains(schema, `FAMILY claim (claim_session_id, claim_instance_id)`) { + if !strings.Contains( + schema, `FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run)`, + ) { t.Fatalf("expected claim family, got %q", schema) } + now := timeutil.Now() _ = sqlDB.Query(t, ` -INSERT INTO system.jobs (id, status, payload, claim_session_id, claim_instance_id) -VALUES (1, 'running', '@!%$%45', 'foo', 101)`) - +INSERT INTO system.jobs (id, status, payload, claim_session_id, claim_instance_id, num_runs, last_run) +VALUES (1, 'running', '@!%$%45', 'foo', 101, 100, $1)`, now) var status, sessionID string - var instanceID int64 - const stmt = "SELECT status, claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1" - sqlDB.QueryRow(t, stmt, 1).Scan(&status, &sessionID, &instanceID) + var instanceID, numRuns int64 + var lastRun time.Time + const stmt = "SELECT status, claim_session_id, claim_instance_id, num_runs, last_run FROM system.jobs WHERE id = $1" + sqlDB.QueryRow(t, stmt, 1).Scan(&status, &sessionID, &instanceID, &numRuns, &lastRun) require.Equal(t, "running", status) require.Equal(t, "foo", sessionID) require.Equal(t, int64(101), instanceID) + require.Equal(t, int64(100), numRuns) + require.Equal(t, timeutil.ToUnixMicros(now), timeutil.ToUnixMicros(lastRun)) } diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 00d46df58484..339c10ccbfc7 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -26,6 +26,17 @@ type Metrics struct { JobMetrics [jobspb.NumJobTypes]*JobTypeMetrics Changefeed metric.Struct + + // AdoptIterations counts the number of adopt loops executed by Registry. + AdoptIterations *metric.Counter + + // ClaimedJobs counts the number of jobs claimed in adopt loops. + ClaimedJobs *metric.Counter + + // ResumedJobs counts the number of jobs resumed by Registry. It doesn't + // correlate with the ClaimedJobs counter because a job can be resumed + // without an adopt loop, e.g., through a StartableJob. + ResumedJobs *metric.Counter } // JobTypeMetrics is a metric.Struct containing metrics for each type of job. @@ -122,6 +133,32 @@ func makeMetaFailOrCancelFailed(typeStr string) metric.Metadata { } } +var ( + metaAdoptIterations = metric.Metadata{ + Name: "jobs.adopt_iterations", + Help: "number of job-adopt iterations performed by the registry", + Measurement: "iterations", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_GAUGE, + } + + metaClaimedJobs = metric.Metadata{ + Name: "jobs.claimed_jobs", + Help: "number of jobs claimed in job-adopt iterations", + Measurement: "jobs", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_GAUGE, + } + + metaResumedClaimedJobs = metric.Metadata{ + Name: "jobs.resumed_claimed_jobs", + Help: "number of claimed-jobs resumed in job-adopt iterations", + Measurement: "jobs", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +) + // MetricStruct implements the metric.Struct interface. func (Metrics) MetricStruct() {} @@ -130,6 +167,9 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { if MakeChangefeedMetricsHook != nil { m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval) } + m.AdoptIterations = metric.NewCounter(metaAdoptIterations) + m.ClaimedJobs = metric.NewCounter(metaClaimedJobs) + m.ResumedJobs = metric.NewCounter(metaResumedClaimedJobs) for i := 0; i < jobspb.NumJobTypes; i++ { jt := jobspb.Type(i) if jt == jobspb.TypeUnspecified { // do not track TypeUnspecified diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 55435a7b0b9a..de65a5229a56 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -195,6 +195,9 @@ func MakeRegistry( } if knobs != nil { r.knobs = *knobs + if knobs.TimeSource != nil { + r.clock = knobs.TimeSource + } } r.mu.adoptedJobs = make(map[jobspb.JobID]*adoptedJob) r.metrics.init(histogramWindowInterval) @@ -404,7 +407,6 @@ VALUES ($1, $2, $3, $4, $5, $6)`, jobID, StatusRunning, payloadBytes, progressBy ); err != nil { return nil, err } - return j, nil } @@ -640,6 +642,7 @@ SELECT claim_session_id // claimJobs iterates the set of jobs which are not currently claimed and // claims jobs up to maxAdoptionsPerLoop. claimJobs := withSession(func(ctx context.Context, s sqlliveness.Session) { + r.metrics.AdoptIterations.Inc(1) if err := r.claimJobs(ctx, s); err != nil { log.Errorf(ctx, "error claiming jobs: %s", err) } @@ -768,6 +771,8 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro } } +// TODO (sajjad): Why are we returning column 'created' in this query? It's not +// being used. const expiredJobsQuery = "SELECT id, payload, status, created FROM system.jobs " + "WHERE (created < $1) AND (id > $2) " + "ORDER BY id " + // the ordering is important as we keep track of the maximum ID we've seen @@ -1015,6 +1020,7 @@ func (r *Registry) stepThroughStateMachine( jobType := payload.Type() log.Infof(ctx, "%s job %d: stepping through state %s with error: %+v", jobType, job.ID(), status, jobErr) jm := r.metrics.JobMetrics[jobType] + switch status { case StatusRunning: if jobErr != nil { @@ -1022,11 +1028,11 @@ func (r *Registry) stepThroughStateMachine( "job %d: resuming with non-nil error", job.ID()) } resumeCtx := logtags.AddTag(ctx, "job", job.ID()) - if payload.StartedMicros == 0 { - if err := job.started(ctx, nil /* txn */); err != nil { - return err - } + + if err := job.started(ctx, nil /* txn */); err != nil { + return err } + var err error func() { jm.CurrentlyRunning.Inc(1) diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 11b71c36272e..1d719eb742d9 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -281,7 +281,7 @@ func TestRegistrySettingUpdate(t *testing.T) { if err != nil { return } - if stmt == matchStmt { + if strings.HasPrefix(stmt, matchStmt) { atomic.AddInt32(&seen, 1) } } diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 62d1e5884b02..0d6b3db648e5 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -13,7 +13,9 @@ package jobs import ( "context" "fmt" + "math" "strconv" + "sync/atomic" "testing" "time" @@ -21,18 +23,24 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -276,3 +284,340 @@ func TestRegistryGCPagination(t *testing.T) { db.QueryRow(t, `SELECT count(1) FROM system.jobs`).Scan(&count) require.Zero(t, count) } + +// TestRetriesWithExponentialBackoff tests the working of exponential delays +// when jobs are retried. Moreover, it tests the effectiveness of the upper +// bound on the retry delay. +func TestRetriesWithExponentialBackoff(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + unitTime := time.Millisecond + clusterSettings := func( + ctx context.Context, initialDelay time.Duration, maxDelay time.Duration, + ) *cluster.Settings { + s := cluster.MakeTestingClusterSettings() + // Set a small adopt and cancel intervals to reduce test time. + adoptIntervalSetting.Override(ctx, &s.SV, unitTime) + cancelIntervalSetting.Override(ctx, &s.SV, unitTime) + retryInitialDelaySetting.Override(ctx, &s.SV, initialDelay) + retryMaxDelaySetting.Override(ctx, &s.SV, maxDelay) + return s + } + + // createJob creates a fake job that keeps failing to be retried. + createJob := func( + ctx context.Context, s serverutils.TestServerInterface, r *Registry, kvDB *kv.DB, + ) jobspb.JobID { + id := r.MakeJobID() + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := r.CreateJobWithTxn(ctx, Record{ + // Job does not accept an empty Details field, so arbitrarily provide + // ImportDetails. + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + }, id, txn) + return err + })) + return id + } + + validateCounts := func(t *testing.T, expectedResumed, resumed int64) { + require.Equal(t, expectedResumed, resumed, "unexpected number of jobs resumed") + } + + cancelJob := func( + t *testing.T, ctx context.Context, db *kv.DB, registry *Registry, jobID jobspb.JobID, + ) { + assert.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return registry.CancelRequested(ctx, txn, jobID) + })) + } + + waitUntilCount := func(t *testing.T, counter *metric.Counter, count int64) { + testutils.SucceedsSoon(t, func() error { + cnt := counter.Count() + if cnt >= count { + return nil + } + return errors.Errorf( + "waiting for %v to reach %d, currently at %d", counter.GetName(), count, cnt, + ) + }) + } + + // nextDelay returns the next delay based calculated from the given retryCnt + // and exponential-backoff parameters. + nextDelay := func( + retryCnt int, initDelay time.Duration, maxDelay time.Duration, + ) time.Duration { + delay := initDelay * ((1 << int(math.Min(62, float64(retryCnt)))) - 1) + if delay < 0 { + delay = maxDelay + } + return time.Duration(math.Min(float64(delay), float64(maxDelay))) + } + + initDelay := time.Second + maxDelay := time.Hour + + type config struct { + name string + retries int + testReverting bool + cancelJob bool + } + + var tests []config + for _, retries := range []int{10, 100} { + for _, reverting := range []bool{true, false} { + for i, cancel := range []bool{true, false} { + if !reverting && i > 0 { + continue + } + tests = append(tests, config{ + name: fmt.Sprintf("n%v-revert_%v-cancel_%v", retries, reverting, cancel), + retries: retries, + testReverting: reverting, + cancelJob: cancel, + }) + } + } + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + // To intercept the schema-change and the migration job. + //updateEventChan = make(chan updateEvent) + var done atomic.Value + done.Store(false) + + // We use a manual clock to control and evaluate job execution times. + // We initialize the clock with Now() because the job-creation timestamp, + // 'created' column in system.jobs, of a new job is set from txn's time. + clock := timeutil.NewManualTime(timeutil.Now()) + timeSource := hlc.NewClock(func() int64 { + return clock.Now().UnixNano() + }, base.DefaultMaxClockOffset) + + // Setup the test cluster. + cs := clusterSettings(ctx, initDelay, maxDelay) + args := base.TestServerArgs{ + Settings: cs, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: &TestingKnobs{ + TimeSource: timeSource, + }, + }, + } + + s, sqlDB, kvDB := serverutils.StartServer(t, args) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + r := s.JobRegistry().(*Registry) + + RegisterConstructor(jobspb.TypeImport, func(job *Job, cs *cluster.Settings) Resumer { + return FakeResumer{ + OnResume: func(ctx context.Context) error { + if done.Load().(bool) { + return nil + } + if test.testReverting { + if test.cancelJob { + cancelJob(t, ctx, kvDB, r, job.ID()) + return nil + } + return errors.Errorf("injecting error with failure") + } + return NewRetryJobError("injecting error in running state") + }, + FailOrCancel: func(ctx context.Context) error { + if done.Load().(bool) { + return nil + } + return NewRetryJobError("injecting error in reverting state") + }, + } + }) + + jm := r.metrics.JobMetrics[jobspb.TypeImport] + adoptItrs := r.metrics.AdoptIterations + // Counting the number of times jobs are resumed. + resumed := r.metrics.ResumedJobs + validateCounts(t, 0, resumed.Count()) + + t.Log("starting a test job") + // Create a new job, which will not be claimed immediately as our clock's + // time has not advanced yet. + jobID := createJob(ctx, s, r, kvDB) + // Expected number of jobs resumed. + expResumed := int64(0) + // Number of times the job is retried, expected to follow num_runs in jobs.system. + retryCnt := 0 + // Validate that the job is claimed but not resumed. + validateCounts(t, expResumed, resumed.Count()) + + // We need to adjust the clock to correctly calculate expected retry + // time of the job. + var lastRun time.Time + tdb.QueryRow(t, + "SELECT created FROM system.jobs where id = $1", jobID, + ).Scan(&lastRun) + + if test.testReverting { + // When we test the reverting state, we first need to run the job and + // cancel it. So we advance the clock such that the job can be started. + clock.AdvanceTo(lastRun) + // wait until the running state completes, but it will not be marked as + // succeeded, resulting in the job to go in reverting state. + waitUntilCount(t, jm.FailOrCancelRetryError, 1) + expResumed = resumed.Count() + retryCnt = 1 + } + + for i := 0; i < test.retries; i++ { + // Exponential delay in the next retry. + delay := nextDelay(retryCnt, initDelay, maxDelay) + t.Logf("next retry delay: %v", delay) + // The delay must not exceed the max delay setting. It ensures that + // we are expecting correct timings from our test code, which in turn + // ensures that the jobs are resumed with correct exponential delays. + require.GreaterOrEqual(t, maxDelay, delay, "delay exceeds the max") + // Advance the clock such that it is before the next expected retry time. + clock.AdvanceTo(lastRun.Add(delay - unitTime)) + t.Logf("advanced clock by %v", delay-unitTime) + // This lets the adopt loops to run for a few times, which ensures that + // the adopt loops do not resume jobs without properly following the + // schedule. + waitUntilCount(t, adoptItrs, adoptItrs.Count()+2) + // Validate that the job is not resumed yet. + validateCounts(t, expResumed, resumed.Count()) + t.Logf("added unitTime %v", unitTime) + // Advance the clock by delta from the expected time of next retry. + clock.Advance(unitTime) + // Wait until ResumedClaimedJobs counter is incremented. + if test.testReverting { + waitUntilCount(t, jm.FailOrCancelRetryError, int64(retryCnt+1)) + } else { + waitUntilCount(t, jm.ResumeRetryError, int64(retryCnt+1)) + } + expResumed++ + retryCnt++ + // Validate that the job is resumed only once. + validateCounts(t, expResumed, resumed.Count()) + lastRun = clock.Now() + } + + done.Store(true) + // Let the job to be retried and finished. + clock.Advance(nextDelay(retryCnt, initDelay, maxDelay)) + // Wait until the job succeeds successfully. + testutils.SucceedsSoon(t, func() error { + var status Status + tdb.QueryRow(t, + "SELECT status FROM system.jobs WHERE id = $1", jobID, + ).Scan(&status) + if (test.testReverting && status == StatusCanceled) || + (!test.testReverting && status == StatusSucceeded) || + (test.testReverting && !test.cancelJob && status == StatusFailed) { + return nil + } + retryCnt++ + clock.Advance(nextDelay(retryCnt, initDelay, maxDelay)) + return errors.Errorf("waiting job %d to succeed, currently %s", jobID, status) + }) + }) + } +} + +// TestExponentialBackoffSettings tests the cluster settings of exponential backoff delays. +func TestExponentialBackoffSettings(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, test := range [...]struct { + name string // Test case ID. + // The setting to test. + settingKey string + // The value of the setting to set. + value time.Duration + }{ + { + name: "backoff initial delay setting", + settingKey: retryInitialDelaySettingKey, + value: 2 * time.Millisecond, + }, + { + name: "backoff max delay setting", + settingKey: retryMaxDelaySettingKey, + value: 2 * time.Millisecond, + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + var tdb *sqlutils.SQLRunner + var finished atomic.Value + finished.Store(false) + var intercepted atomic.Value + intercepted.Store(false) + intercept := func(orig, updated JobMetadata) error { + // If this updated is not to mark as succeeded or the test has already failed. + if updated.Status != StatusSucceeded { + return nil + } + + // If marking the first time, prevent the marking and update the cluster + // setting based on test params. The setting value should be reduced + // from a large value to a small value. + if !intercepted.Load().(bool) { + tdb.Exec(t, fmt.Sprintf("SET CLUSTER SETTING %s = '%v'", test.settingKey, test.value)) + intercepted.Store(true) + return errors.Errorf("preventing the job from succeeding") + } + // Let the job to succeed. As we began with a long interval and prevented + // the job than succeeding in the first attempt, its re-execution + // indicates that the setting is updated successfully and is in effect. + finished.Store(true) + return nil + } + + // Setup the test cluster. + cs := cluster.MakeTestingClusterSettings() + // Set a small adopt interval to reduce test time. + adoptIntervalSetting.Override(ctx, &cs.SV, 2*time.Millisecond) + // Begin with a long delay. + retryInitialDelaySetting.Override(ctx, &cs.SV, time.Hour) + retryMaxDelaySetting.Override(ctx, &cs.SV, time.Hour) + args := base.TestServerArgs{ + Settings: cs, + Knobs: base.TestingKnobs{JobsTestingKnobs: &TestingKnobs{BeforeUpdate: intercept}}, + } + s, sdb, kvDB := serverutils.StartServer(t, args) + defer s.Stopper().Stop(ctx) + tdb = sqlutils.MakeSQLRunner(sdb) + // Create and run a dummy job. + RegisterConstructor(jobspb.TypeImport, func(_ *Job, cs *cluster.Settings) Resumer { + return FakeResumer{} + }) + registry := s.JobRegistry().(*Registry) + id := registry.MakeJobID() + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := registry.CreateJobWithTxn(ctx, Record{ + // Job does not accept an empty Details field, so arbitrarily provide + // ImportDetails. + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + }, id, txn) + return err + })) + + // Wait for the job to be succeed. + testutils.SucceedsSoon(t, func() error { + if finished.Load().(bool) { + return nil + } + return errors.Errorf("waiting for the job to complete") + }) + }) + } +} diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 241978ac53c3..fa15ad40c8d1 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestingKnobs are base.ModuleTestingKnobs for testing jobs related infra. @@ -59,8 +60,14 @@ type TestingKnobs struct { // returned from the state machine that transitions it from one state to // another. AfterJobStateMachine func() + + // TimeSource replaces registry's clock. + TimeSource *hlc.Clock } +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {} + // TestingIntervalOverrides contains variables to override the intervals and // settings of periodic tasks. type TestingIntervalOverrides struct { @@ -73,16 +80,10 @@ type TestingIntervalOverrides struct { // Gc overrides the gcIntervalSetting cluster setting. Gc *time.Duration - // Base overrides the intervalBaseSetting cluster setting. - Base *float64 - // RetentionTime overrides the retentionTimeSetting cluster setting. RetentionTime *time.Duration } -// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. -func (*TestingKnobs) ModuleTestingKnobs() {} - // NewTestingKnobsWithShortIntervals return a TestingKnobs structure with // overrides for short adopt and cancel intervals. func NewTestingKnobsWithShortIntervals() *TestingKnobs { diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index cb2c24d0be94..644d53fac50b 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -15,7 +15,9 @@ import ( "context" "fmt" "strings" + "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" @@ -36,12 +38,19 @@ import ( // changes will be ignored unless JobUpdater is used). type UpdateFn func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error +// RunStats consists of job-run statistics: num of runs and last-run timestamp. +type RunStats struct { + LastRun time.Time + NumRuns int +} + // JobMetadata groups the job metadata values passed to UpdateFn. type JobMetadata struct { ID jobspb.JobID Status Status Payload *jobspb.Payload Progress *jobspb.Progress + RunStats *RunStats } // CheckRunningOrReverting returns an InvalidStatusError if md.Status is not @@ -80,6 +89,15 @@ func (ju *JobUpdater) hasUpdates() bool { return ju.md != JobMetadata{} } +// UpdateRunStats is used to update the exponential-backoff parameters (last_run and +// num_runs in system.jobs table. +func (ju *JobUpdater) UpdateRunStats(numRuns int, lastRun time.Time) { + ju.md.RunStats = &RunStats{ + NumRuns: numRuns, + LastRun: lastRun, + } +} + // UpdateHighwaterProgressed updates job updater progress with the new high water mark. func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobUpdater) error { if err := md.CheckRunningOrReverting(); err != nil { @@ -124,13 +142,14 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF var payload *jobspb.Payload var progress *jobspb.Progress + backoffIsActive := j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { var err error var row tree.Datums row, err = j.registry.ex.QueryRowEx( ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - getSelectStmtForJobUpdate(j.sessionID != "", useReadLock), j.ID(), + getSelectStmtForJobUpdate(j.sessionID != "", useReadLock, backoffIsActive), j.ID(), ) if err != nil { return err @@ -144,6 +163,7 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF return errors.AssertionFailedf("job %d: expected string status, but got %T", j.ID(), statusString) } + status := Status(*statusString) if j.sessionID != "" { if row[3] == tree.DNull { return errors.Errorf( @@ -157,8 +177,6 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF j.ID(), statusString, j.sessionID, storedSession) } } - - status := Status(*statusString) if payload, err = UnmarshalPayload(row[1]); err != nil { return err } @@ -172,6 +190,28 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF Payload: payload, Progress: progress, } + + if backoffIsActive { + offset := 0 + if j.sessionID != "" { + offset = 1 + } + var lastRun *tree.DTimestamp + lastRun, ok = row[3+offset].(*tree.DTimestamp) + if !ok { + return errors.AssertionFailedf("job %d: expected timestamp last_run, but got %T", j.ID(), lastRun) + } + var numRuns *tree.DInt + numRuns, ok = row[4+offset].(*tree.DInt) + if !ok { + return errors.AssertionFailedf("job %d: expected int num_runs, but got %T", j.ID(), numRuns) + } + md.RunStats = &RunStats{ + NumRuns: int(*numRuns), + LastRun: lastRun.Time, + } + } + var ju JobUpdater if err := updateFn(txn, md, &ju); err != nil { return err @@ -181,6 +221,7 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF return err } } + if !ju.hasUpdates() { return nil } @@ -226,6 +267,11 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF addSetter("progress", progressBytes) } + if backoffIsActive && ju.md.RunStats != nil { + addSetter("last_run", ju.md.RunStats.LastRun) + addSetter("num_runs", ju.md.RunStats.NumRuns) + } + updateStmt := fmt.Sprintf( "UPDATE system.jobs SET %s WHERE id = $1", strings.Join(setters, ", "), @@ -257,21 +303,23 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF } // getSelectStmtForJobUpdate constructs the select statement used in Job.update. -func getSelectStmtForJobUpdate(hasSessionID, useReadLock bool) string { +func getSelectStmtForJobUpdate(hasSessionID, useReadLock, backoffIsActive bool) string { const ( selectWithoutSession = `SELECT status, payload, progress` selectWithSession = selectWithoutSession + `, claim_session_id` from = ` FROM system.jobs WHERE id = $1` fromForUpdate = from + ` FOR UPDATE` + backoffColumns = ", COALESCE(last_run, created), COALESCE(num_runs, 0)" ) + stmt := selectWithoutSession if hasSessionID { - if useReadLock { - return selectWithSession + fromForUpdate - } - return selectWithSession + from + stmt = selectWithSession + } + if backoffIsActive { + stmt = stmt + backoffColumns } if useReadLock { - return selectWithoutSession + fromForUpdate + return stmt + fromForUpdate } - return selectWithoutSession + from + return stmt + from } diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 67295a3ba331..67a05e7c42eb 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -7,11 +7,14 @@ go_library( "migration.go", "system_migration.go", "tenant_migration.go", + "testing_knobs.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/migration", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/clusterversion", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/roachpb:with-mocks", diff --git a/pkg/migration/migrationjob/migration_job.go b/pkg/migration/migrationjob/migration_job.go index cbbc8aedaf80..e74bbdf370f0 100644 --- a/pkg/migration/migrationjob/migration_job.go +++ b/pkg/migration/migrationjob/migration_job.go @@ -56,7 +56,6 @@ type resumer struct { var _ jobs.Resumer = (*resumer)(nil) func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { - execCtx := execCtxI.(sql.JobExecContext) pl := r.j.Payload() cv := *pl.GetMigration().ClusterVersion @@ -87,6 +86,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error { Settings: execCtx.ExecCfg().Settings, InternalExecutor: execCtx.ExecCfg().InternalExecutor, LeaseManager: execCtx.ExecCfg().LeaseManager, + TestingKnobs: execCtx.ExecCfg().MigrationTestingKnobs, }) default: return errors.AssertionFailedf("unknown migration type %T", m) diff --git a/pkg/migration/migrationmanager/BUILD.bazel b/pkg/migration/migrationmanager/BUILD.bazel index f4216c93cf40..2d6e75eeec45 100644 --- a/pkg/migration/migrationmanager/BUILD.bazel +++ b/pkg/migration/migrationmanager/BUILD.bazel @@ -2,14 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "migrationmanager", - srcs = [ - "manager.go", - "testing_knobs.go", - ], + srcs = ["manager.go"], importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationmanager", visibility = ["//visibility:public"], deps = [ - "//pkg/base", "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", @@ -39,7 +35,6 @@ go_test( "manager_external_test.go", ], deps = [ - ":migrationmanager", "//pkg/base", "//pkg/clusterversion", "//pkg/jobs", diff --git a/pkg/migration/migrationmanager/manager.go b/pkg/migration/migrationmanager/manager.go index f3e974ed36a0..76abc95d4c6f 100644 --- a/pkg/migration/migrationmanager/manager.go +++ b/pkg/migration/migrationmanager/manager.go @@ -44,7 +44,7 @@ type Manager struct { jr *jobs.Registry codec keys.SQLCodec settings *cluster.Settings - knobs TestingKnobs + knobs migration.TestingKnobs } // GetMigration returns the migration associated with this key. @@ -71,9 +71,9 @@ func NewManager( jr *jobs.Registry, codec keys.SQLCodec, settings *cluster.Settings, - testingKnobs *TestingKnobs, + testingKnobs *migration.TestingKnobs, ) *Manager { - var knobs TestingKnobs + var knobs migration.TestingKnobs if testingKnobs != nil { knobs = *testingKnobs } diff --git a/pkg/migration/migrationmanager/manager_external_test.go b/pkg/migration/migrationmanager/manager_external_test.go index f0389b5d41de..4689c19f6cf3 100644 --- a/pkg/migration/migrationmanager/manager_external_test.go +++ b/pkg/migration/migrationmanager/manager_external_test.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/migration" - "github.com/cockroachdb/cockroach/pkg/migration/migrationmanager" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -68,7 +67,7 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { BinaryVersionOverride: startCV.Version, DisableAutomaticVersionUpgrade: 1, }, - MigrationManager: &migrationmanager.TestingKnobs{ + MigrationManager: &migration.TestingKnobs{ ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { return []clusterversion.ClusterVersion{to} }, @@ -202,7 +201,7 @@ func TestMigrateUpdatesReplicaVersion(t *testing.T) { BinaryVersionOverride: startCV.Version, DisableAutomaticVersionUpgrade: 1, }, - MigrationManager: &migrationmanager.TestingKnobs{ + MigrationManager: &migration.TestingKnobs{ ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { return []clusterversion.ClusterVersion{from, to} }, @@ -320,7 +319,7 @@ func TestConcurrentMigrationAttempts(t *testing.T) { BinaryVersionOverride: versions[0].Version, DisableAutomaticVersionUpgrade: 1, }, - MigrationManager: &migrationmanager.TestingKnobs{ + MigrationManager: &migration.TestingKnobs{ ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { return versions }, @@ -402,7 +401,7 @@ func TestPauseMigration(t *testing.T) { BinaryVersionOverride: startCV.Version, DisableAutomaticVersionUpgrade: 1, }, - MigrationManager: &migrationmanager.TestingKnobs{ + MigrationManager: &migration.TestingKnobs{ ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { return []clusterversion.ClusterVersion{to} }, diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index cf58c748ac10..4afafc3af6cc 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "fix_descriptor_migration.go", "join_tokens.go", "migrations.go", + "retry_jobs_with_exponential_backoff.go", "sql_stats.go", "tenant_usage.go", "truncated_state.go", @@ -16,17 +17,21 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/migration", "//pkg/roachpb:with-mocks", + "//pkg/security", "//pkg/server/serverpb", + "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/systemschema", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/startupmigrations", "//pkg/util/encoding", @@ -34,6 +39,7 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_kr_pretty//:pretty", ], ) @@ -42,24 +48,38 @@ go_test( srcs = [ "delete_deprecated_namespace_tabledesc_external_test.go", "fix_descriptor_migration_external_test.go", + "helpers_test.go", "main_test.go", + "retry_jobs_with_exponential_backoff_external_test.go", "truncated_state_external_test.go", ], + embed = [":migrations"], deps = [ "//pkg/base", "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", "//pkg/kv/kvserver/stateloader", + "//pkg/migration", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/lease", "//pkg/sql/catalog/systemschema", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/sem/tree", + "//pkg/sql/sqlutil", + "//pkg/sql/types", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", @@ -67,6 +87,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/migration/migrations/helpers_test.go b/pkg/migration/migrations/helpers_test.go new file mode 100644 index 000000000000..7f6beffbb585 --- /dev/null +++ b/pkg/migration/migrations/helpers_test.go @@ -0,0 +1,24 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import "github.com/cockroachdb/cockroach/pkg/sql/catalog" + +func HasBackoffCols(jobsTable catalog.TableDescriptor, col string) (bool, error) { + return hasColumn(jobsTable, col) +} + +func HasBackoffIndex(jobsTable catalog.TableDescriptor, index string) (bool, error) { + return hasIndex(jobsTable, index) +} + +const AddColsQuery = addColsQuery +const AddIndexQuery = addIndexQuery diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index a8adc5ae9bfa..a6570ef7548d 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -77,6 +77,10 @@ var migrations = []migration.Migration{ toCV(clusterversion.TenantUsageTable), tenantUsageTableMigration, ), + migration.NewTenantMigration( + "add last_run and num_runs columns to system.jobs", + toCV(clusterversion.RetryJobsWithExponentialBackoff), + retryJobsWithExponentialBackoff), } func init() { diff --git a/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go b/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go new file mode 100644 index 000000000000..f01c8c06d297 --- /dev/null +++ b/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go @@ -0,0 +1,271 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/kr/pretty" +) + +// Target schema changes in the system.jobs table, adding two columns and an +// index for job retries with exponential backoff functionality. +const ( + addColsQuery = ` +ALTER TABLE system.jobs + ADD COLUMN num_runs INT8 FAMILY claim, + ADD COLUMN last_run TIMESTAMP FAMILY claim` + addIndexQuery = ` +CREATE INDEX jobs_run_stats_idx + ON system.jobs (claim_session_id, status, created) + STORING (last_run, num_runs, claim_instance_id) + WHERE ` + systemschema.JobsRunStatsIdxPredicate +) + +// retryJobsWithExponentialBackoff changes the schema of system.jobs table in +// two steps. It first adds two new columns and then an index. +func retryJobsWithExponentialBackoff( + ctx context.Context, cs clusterversion.ClusterVersion, d migration.TenantDeps, +) error { + ops := [...]struct { + // Operation name. + name string + // List of schema names, e.g., column names, which are modified in the query. + schemaList []string + // Schema change query. + query string + // Function to check existing schema. + schemaExistsFn func(catalog.TableDescriptor, string) (bool, error) + }{ + {"jobs-add-columns", []string{"num_runs", "last_run"}, addColsQuery, hasColumn}, + {"jobs-add-index", []string{"jobs_run_stats_idx"}, addIndexQuery, hasIndex}, + } + for _, op := range ops { + if err := migrateTable(ctx, cs, d, op.name, keys.JobsTableID, op.query, + func(table catalog.TableDescriptor) (bool, error) { + // Expect all or none. + var exists bool + for i, schemaName := range op.schemaList { + hasSchema, err := op.schemaExistsFn(table, schemaName) + if err != nil { + return false, err + } + if i > 0 && exists != hasSchema { + return false, errors.Errorf("observed partial schema exists while performing %v", op.name) + } + exists = hasSchema + } + return exists, nil + }); err != nil { + return err + } + } + return nil +} + +// migrateTable changes the table schema based on the given schema-change query. +// The change is ignored if the table already has the required changes, which are +// explicitly limited to two new columns (last_run and num_runs) and an index +// (jobs_run_stats_idx) in this migration. +func migrateTable( + ctx context.Context, + _ clusterversion.ClusterVersion, + d migration.TenantDeps, + opTag string, + tableID descpb.ID, + schemaChangeQuery string, + schemaExists func(descriptor catalog.TableDescriptor) (bool, error), +) error { + for { + // - Fetch the table, reading its descriptor from storage. + // - Check if any mutation job exists for the table. These mutations can + // belong to a previous migration attempt that failed. + // - If any mutation job exists: + // - Wait for the ongoing mutations to complete. + // - Continue to the beginning of the loop to cater for the mutations + // that started in while waiting. + // - Check if the intended schema-changes already exist. + // - If already exists, skip the schema-change and return as it has already + // completed in a previous migration attempt. + // - Otherwise, perform the schema-change and return. + + log.Infof(ctx, "performing table migration operation %v", opTag) + // Retrieve the table. + jt, err := readTableDescriptor(ctx, d, tableID) + if err != nil { + return err + } + // Wait for any in-flight schema changes to complete. + if mutations := jt.GetMutationJobs(); len(mutations) > 0 { + for _, mutation := range mutations { + log.Infof(ctx, "waiting for the mutation job %v to complete", mutation.JobID) + if d.TestingKnobs.BeforeWaitInRetryJobsWithExponentialBackoffMigration != nil { + d.TestingKnobs.BeforeWaitInRetryJobsWithExponentialBackoffMigration(jobspb.JobID(mutation.JobID)) + } + if _, err := d.InternalExecutor.Exec(ctx, "migration-mutations-wait", + nil, "SHOW JOB WHEN COMPLETE $1", mutation.JobID); err != nil { + return err + } + } + continue + } + + // Ignore the schema change if the table already has the required schema. + if ok, err := schemaExists(jt); err != nil { + return errors.Wrapf(err, "error while validating descriptors during"+ + " operation %s", opTag) + } else if ok { + log.Infof(ctx, "skipping %s operation as the schema change already exists.", opTag) + // TODO(sajjad): To discuss: This seems not the best way to ensure that we + // are ignoring a mutation. We should have some other direct invariant, + // e.g., reading some counter from the table descriptor or in-memory state + // that counts the number of times the table has been successfully updated. + if d.TestingKnobs != nil && d.TestingKnobs.SkippedMutation != nil { + d.TestingKnobs.SkippedMutation() + } + return nil + } + + // Modify the table. + log.Infof(ctx, "performing operation: %s", opTag) + if _, err := d.InternalExecutor.ExecEx( + ctx, + fmt.Sprintf("migration-alter-table-%d", tableID), + nil, /* txn */ + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + schemaChangeQuery); err != nil { + return err + } + return nil + } +} + +func readTableDescriptor( + ctx context.Context, d migration.TenantDeps, tableID descpb.ID, +) (catalog.TableDescriptor, error) { + var jt catalog.TableDescriptor + if err := descs.Txn(ctx, d.Settings, d.LeaseManager, d.InternalExecutor, d.DB, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + jt, err = descriptors.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidCached: true, + Required: true, + }, + }) + return err + }); err != nil { + return nil, err + } + return jt, nil +} + +// hasColumn return true if storedTable already has the given column. storedTable +// descriptor must be read from the system as compared to reading from the +// systemschema package. +// This function returns an error if the column exists but doesn't match with the +// table's descriptor defined in systemschema/system.go. The comparison is not strict +// as several descriptor fields are ignored. +func hasColumn(storedTable catalog.TableDescriptor, colName string) (bool, error) { + storedCol, err := storedTable.FindColumnWithName(tree.Name(colName)) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return false, nil + } + return false, err + } + + expectedCol, err := systemschema.JobsTable.FindColumnWithName(tree.Name(colName)) + if err != nil { + return false, errors.Wrapf(err, "columns name %s is invalid.", colName) + } + + expectedCopy := expectedCol.ColumnDescDeepCopy() + storedCopy := storedCol.ColumnDescDeepCopy() + + storedCopy.ID = 0 + expectedCopy.ID = 0 + + if err = ensureProtoMessagesAreEqual(&expectedCopy, &storedCopy); err != nil { + return false, err + } + return true, nil +} + +// hasIndex return true if the table has the given index. It returns +// an error if the index exists but doesn't match with the table's +// descriptor defined in systemschema/system.go. The comparison is not strict +// as several descriptor fields are ignored. +func hasIndex(storedTable catalog.TableDescriptor, indexName string) (bool, error) { + storedIdx, err := storedTable.FindIndexWithName(indexName) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return false, nil + } + return false, err + } + expectedIdx, err := systemschema.JobsTable.FindIndexWithName(indexName) + if err != nil { + return false, errors.Wrapf(err, "index name %s is invalid", indexName) + } + storedCopy := storedIdx.IndexDescDeepCopy() + expectedCopy := expectedIdx.IndexDescDeepCopy() + // Ignore the fields that don't matter in the comparison. + storedCopy.ID = 0 + expectedCopy.ID = 0 + storedCopy.Version = 0 + expectedCopy.Version = 0 + storedCopy.CreatedExplicitly = false + expectedCopy.CreatedExplicitly = false + expectedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + storedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + + if err = ensureProtoMessagesAreEqual(&expectedCopy, &storedCopy); err != nil { + return false, err + } + return true, nil +} + +// ensureProtoMessagesAreEqual verifies whether the given protobufs are equal or +// not, returning an error if they are not equal. +func ensureProtoMessagesAreEqual(expected, found protoutil.Message) error { + expectedBytes, err := protoutil.Marshal(expected) + if err != nil { + return err + } + foundBytes, err := protoutil.Marshal(found) + if err != nil { + return err + } + if bytes.Equal(expectedBytes, foundBytes) { + return nil + } + return errors.Errorf("expected descriptor doesn't match "+ + "with found descriptor: %s", strings.Join(pretty.Diff(expected, found), "\n")) +} diff --git a/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go b/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go new file mode 100644 index 000000000000..9664320f8f8d --- /dev/null +++ b/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go @@ -0,0 +1,683 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + gosql "database/sql" + "regexp" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrations" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestExponentialBackoffMigration tests modification of system.jobs table +// during migration. It does not test the migration success during failures. +func TestExponentialBackoffMigration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.RetryJobsWithExponentialBackoff - 1), + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Inject the old copy of the descriptor. + injectLegacyTable(t, ctx, s) + // Validate that the jobs table has old schema. + validateSchemaExists(t, ctx, s, sqlDB, false) + // Run the migration. + migrate(t, sqlDB, false, nil) + // Validate that the jobs table has new schema. + validateSchemaExists(t, ctx, s, sqlDB, true) + // Make sure that jobs work by running a job. + runGcJob(t, tdb) +} + +type updateEvent struct { + orig, updated jobs.JobMetadata + errChan chan error +} + +// TestMigrationWithFailures tests modification of system.jobs table during +// migration with different failures. It tests the system behavior with failure +// combinations of the migration job and schema-change jobs at different stages +// in their progress. +func TestMigrationWithFailures(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The tests follows the following procedure. + // + // Inject the old table descriptor and ensure that the system is using the + // deprecated jobs-table. + // + // Start migration, which initiates two schema-change jobs one by one. Test + // the system for each schema-change job separately. Later on, we inject + // failure in this migration, causing it to fail. + // + // Depending on the test setting, intercept the target schema-change job, + // preventing the job from progressing. We may cancel this schema-change or + // let it succeed to test different scenarios. + // + // Cancel the migration, causing the migration to revert and fail. + // + // Wait for the canceled migration-job to finish, expecting its failure. The + // schema-change job is still not progressing to control what the restarted + // migration will observe. + // + // Restart the migration, expecting it to succeed. Depending on the test setting, + // the intercepted schema-change job may wail for the migration job to resume. + // If it does, the migration job is expected to observe the ongoing schema-change. + // The ongoing schema-change is canceled or not, depending on the test case. + // In either case, we expect the correct number of mutations to be skipped + // during the migration. + // + // If we canceled the schema-job, expect it to rerun + // as part of the migration. Otherwise, expect the schema-change to be ignored + // during the migration. + // + // Finally, we validate that the schema changes are in effect by reading the new + // columns and the index, and by running a job that is failed and retried to + // practice exponential-backoff machinery. + + for _, test := range []struct { + // Test identifier. + name string + // Job status when the job is intercepted while transitioning to the intercepted status. + query string + // Whether the schema-change job should wait for the migration to restart + // after failure before proceeding. + waitForMigrationRestart bool + // Cancel the intercepted schema-change to inject a failure during migration. + cancelSchemaJob bool + // Expected number of schema-changes that are skipped during migration. + expectedSkipped int + }{ + { + name: "adding columns", + query: migrations.AddColsQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "adding index", + query: migrations.AddIndexQuery, + waitForMigrationRestart: false, // Does not matter. + cancelSchemaJob: false, // Does not matter. + expectedSkipped: 0, // Will be ignored. + }, + { + name: "fail adding columns", + query: migrations.AddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding columns. + expectedSkipped: 0, + }, + { + name: "fail adding index", + query: migrations.AddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe failing schema change. + cancelSchemaJob: true, // To fail adding index. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip none", + query: migrations.AddColsQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 0, // Both columns and index must be added. + }, + { + name: "skip adding columns", + query: migrations.AddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: true, // To fail adding index and skip adding column. + expectedSkipped: 1, // Columns must not be added again. + }, + { + name: "skip adding columns and index", + query: migrations.AddIndexQuery, + waitForMigrationRestart: true, // Need to wait to observe schema change and have correct expectedSkipped count. + cancelSchemaJob: false, // To fail adding index and skip adding column. + expectedSkipped: 2, // Both columns and index must not be added again. + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + // To intercept the schema-change and the migration job. + updateEventChan := make(chan updateEvent) + beforeUpdate := func(orig, updated jobs.JobMetadata) error { + ue := updateEvent{ + orig: orig, + updated: updated, + errChan: make(chan error), + } + updateEventChan <- ue + return <-ue.errChan + } + + var schemaEvent updateEvent + migrationWaitCh := make(chan struct{}) + beforeMutationWait := func(jobID jobspb.JobID) { + if !test.waitForMigrationRestart || jobID != schemaEvent.orig.ID { + return + } + migrationWaitCh <- struct{}{} + } + + // Number of schema-change jobs that are skipped. + skippedCnt := int32(0) + ignoredMutationObserver := func() { + atomic.AddInt32(&skippedCnt, 1) + } + + shortInterval := 2 * time.Millisecond + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.RetryJobsWithExponentialBackoff - 1), + }, + JobsTestingKnobs: &jobs.TestingKnobs{ + IntervalOverrides: jobs.TestingIntervalOverrides{ + Adopt: &shortInterval, + Cancel: &shortInterval, + }, + BeforeUpdate: beforeUpdate, + }, + MigrationManager: &migration.TestingKnobs{ + BeforeWaitInRetryJobsWithExponentialBackoffMigration: beforeMutationWait, + SkippedMutation: ignoredMutationObserver, + }, + }, + }, + } + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.gc = '2ms'") + // Inject the old copy of the descriptor. + injectLegacyTable(t, ctx, s) + // Validate that the jobs-table has old schema. + validateSchemaExists(t, ctx, s, sqlDB, false) + // Run the migration, expecting failure. + t.Log("trying migration, expecting to fail") + // Channel to wait for the migration job to complete. + finishChan := make(chan struct{}) + go migrate(t, sqlDB, true, finishChan) + + var migJobID jobspb.JobID + // Intercept the target schema-change job and get migration-job's ID. + t.Log("intercepting the schema job") + for { + e := <-updateEventChan + // The migration job creates schema-change jobs. Therefore, we are guaranteed + // to get the migration-job's ID before canceling the job later on. + if e.orig.Payload.Type() == jobspb.TypeMigration { + migJobID = e.orig.ID + e.errChan <- nil + continue + } + schemaQuery := strings.Replace(e.orig.Payload.Description, "system.public.jobs", "system.jobs", -1) + testQuery := removeSpaces(test.query) + testQuery = strings.ReplaceAll(testQuery, ":::STRING", "") + if testQuery == schemaQuery { + // Intercepted the target schema-change. + schemaEvent = e + t.Logf("intercepted schema change job: %v", e.orig.ID) + break + } + // Ignore all other job updates. + e.errChan <- nil + } + // Cancel the migration job. + t.Log("canceling the migration job") + go cancelJob(t, ctx, s, migJobID) + + // Wait for the migration job to finish while preventing the intercepted + // schema-change job from progressing. + t.Log("waiting for the migration job to finish.") + testutils.SucceedsSoon(t, func() error { + for { + select { + case <-finishChan: + return nil + case e := <-updateEventChan: + e.errChan <- nil + default: + return errors.Errorf("waiting for the migration job to finish.") + } + } + }) + + // Channel to finish the goroutine when the test completes. + done := make(chan struct{}) + // Let all jobs to continue until test's completion, except the intercepted + // schema-change job that we resume later on. + go func() { + for { + select { + case e := <-updateEventChan: + e.errChan <- nil + case <-done: + return + } + } + }() + + // Restart the migration job. + t.Log("retrying migration, expecting to succeed") + go migrate(t, sqlDB, false, finishChan) + + // Wait until the new migration job observes an existing mutation job. + if test.waitForMigrationRestart { + t.Log("waiting for the migration job to observe a mutation") + <-migrationWaitCh + } + + t.Log("resuming the schema change job") + // If configured so, mark the schema-change job to cancel. + if test.cancelSchemaJob { + cancelJob(t, ctx, s, schemaEvent.orig.ID) + } + // Resume the schema-change job and all other jobs. + schemaEvent.errChan <- nil + + // If canceled the job, wait for the job to finish. + if test.cancelSchemaJob { + t.Log("waiting for the schema job to reach the cancel status") + waitUntilState(t, tdb, schemaEvent.orig.ID, jobs.StatusCanceled) + } + + // Wait for the migration to complete, expecting success. + t.Logf("waiting for the new migration job to complete.") + testutils.SucceedsSoon(t, func() error { + select { + case <-finishChan: + return nil + default: + } + return errors.Errorf("waiting for the migration job to finish.") + }) + if test.waitForMigrationRestart { + // Ensure that we have observed the expected number of ignored schema change jobs. + require.Equal(t, int32(test.expectedSkipped), atomic.LoadInt32(&skippedCnt)) + } + + // Validate that the jobs table has new schema. + validateSchemaExists(t, ctx, s, sqlDB, true) + done <- struct{}{} + validateJobRetries(t, tdb, updateEventChan) + }) + } +} + +// cancelJob marks the given job as cancel-requested, leading the job to be +// canceled. +func cancelJob( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, jobID jobspb.JobID, +) { + err := s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Using this way of canceling because the migration job us non-cancelable. + // Canceling in this way skips the check. + return s.JobRegistry().(*jobs.Registry).UpdateJobWithTxn( + ctx, jobID, txn, false /* useReadLock */, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + ju.UpdateStatus(jobs.StatusCancelRequested) + return nil + }) + }) + assert.NoError(t, err) +} + +// waitUntilState waits until the specified job reaches to given state. +func waitUntilState( + t *testing.T, tdb *sqlutils.SQLRunner, jobID jobspb.JobID, expectedStatus jobs.Status, +) { + testutils.SucceedsSoon(t, func() error { + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM system.jobs WHERE id = $1", jobID, + ).Scan(&status) + if status == expectedStatus { + return nil + } + return errors.Errorf( + "waiting for job %v to reach status %v, current status is %v", + jobID, expectedStatus, status) + }) +} + +// migrate runs cluster migration by changing the 'version' cluster setting. +func migrate(t *testing.T, sqlDB *gosql.DB, expectError bool, done chan struct{}) { + defer func() { + if done != nil { + done <- struct{}{} + } + }() + _, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.RetryJobsWithExponentialBackoff).String()) + if expectError { + assert.Error(t, err) + return + } + assert.NoError(t, err) +} + +// injectLegacyTable overwrites the existing table descriptor with the previous +// table descriptor. +func injectLegacyTable(t *testing.T, ctx context.Context, s serverutils.TestServerInterface) { + err := descs.Txn( + ctx, + s.ClusterSettings(), + s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), + s.DB(), + func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + id := systemschema.JobsTable.GetID() + tab, err := descriptors.GetMutableTableByID(ctx, txn, id, tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return err + } + builder := tabledesc.NewBuilder(deprecatedDescriptor()) + require.NoError(t, builder.RunPostDeserializationChanges(ctx, nil)) + tab.TableDescriptor = builder.BuildCreatedMutableTable().TableDescriptor + tab.Version = tab.ClusterVersion.Version + 1 + return descriptors.WriteDesc(ctx, false, tab, txn) + }) + require.NoError(t, err) +} + +// validateSchemaExists validates whether the schema-changes of the system.jobs +// table exist or not. +func validateSchemaExists( + t *testing.T, + ctx context.Context, + s serverutils.TestServerInterface, + sqlDB *gosql.DB, + expectExists bool, +) { + // First validate by reading the columns and the index. + for _, stmt := range []string{ + "SELECT last_run, num_runs FROM system.jobs LIMIT 0", + "SELECT num_runs, last_run, claim_instance_id from system.jobs@jobs_run_stats_idx LIMIT 0", + } { + _, err := sqlDB.Exec(stmt) + if expectExists { + require.NoError( + t, err, "expected schema to exist, but unable to query it, using statement: %s", stmt, + ) + } else { + require.Error( + t, err, "expected schema to not exist, but queried it successfully, using statement: %s", stmt, + ) + } + } + + // Manually verify the table descriptor. + table := jobsTable(t, ctx, s) + str := "not have" + if expectExists { + str = "have" + } + for _, schema := range [...]struct { + name string + validationFn func(catalog.TableDescriptor, string) (bool, error) + }{ + // TODO(sajjad): To discuss: Shouldn't we use an independent way to verify + // instead of reusing HasBackoffCols and HasBackoffIndex functions from + // the migration itself? If not, I think we should at least write a unit + // test to validate the correctness of these functions. + {"num_runs", migrations.HasBackoffCols}, + {"last_run", migrations.HasBackoffCols}, + {"jobs_run_stats_idx", migrations.HasBackoffIndex}, + } { + updated, err := schema.validationFn(table, schema.name) + require.NoError(t, err) + require.Equal(t, expectExists, updated, + "expected jobs table to %s %s", str, schema) + } +} + +// jobsTable returns the system.jobs table descriptor, reading it from the +// storage. +func jobsTable( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, +) catalog.TableDescriptor { + var table catalog.TableDescriptor + // Retrieve the jobs table. + err := descs.Txn(ctx, + s.ClusterSettings(), + s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), + s.DB(), + func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + table, err = descriptors.GetImmutableTableByID(ctx, txn, keys.JobsTableID, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidCached: true, + Required: true, + }, + }) + return err + }) + require.NoError(t, err) + return table +} + +// runGcJob creates and alters a dummy table to trigger jobs machinery, +// which validates its working. +func runGcJob(t *testing.T, tdb *sqlutils.SQLRunner) { + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;") + tdb.Exec(t, "DROP TABLE foo CASCADE;") + var jobID int64 + tdb.QueryRow(t, ` +SELECT job_id + FROM [SHOW JOBS] + WHERE job_type = 'SCHEMA CHANGE GC' AND description LIKE '%foo%';`, + ).Scan(&jobID) + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID, + ).Scan(&status) + require.Equal(t, jobs.StatusSucceeded, status) +} + +func validateJobRetries(t *testing.T, tdb *sqlutils.SQLRunner, eventCh chan updateEvent) { + tdb.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '2ms'") + tdb.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.max_delay = '10ms'") + done := make(chan struct{}) + go func() { + // Fail a GC job once and then let it succeed. + var failed atomic.Value + failed.Store(false) + var ev updateEvent + for { + // eventCh receives events in the BeforeUpdate hook. + select { + case ev = <-eventCh: + case <-done: + return + } + // If not a schema-change GC job, let it run. + if ev.orig.Payload.Type() != jobspb.TypeSchemaChangeGC { + ev.errChan <- nil + continue + } + if ev.updated.Status == jobs.StatusSucceeded { + // If the job is succeeding, it must have been retried once, and the + // the number of retries is populated from the jobs table. + assert.Equal(t, 1, ev.orig.RunStats.NumRuns) + } + if failed.Load().(bool) || + ev.updated.Status != jobs.StatusRunning { + ev.errChan <- nil + continue + } + failed.Store(true) + ev.errChan <- jobs.NewRetryJobError("failing job to retry") + } + }() + runGcJob(t, tdb) + done <- struct{}{} +} + +func removeSpaces(stmt string) string { + stmt = strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(stmt, " ")) + stmt = strings.ReplaceAll(stmt, "( ", "(") + stmt = strings.ReplaceAll(stmt, " )", ")") + return stmt +} + +// deprecatedDescriptor returns the system.jobs table descriptor that was being used +// before adding two new columns and an index in the current version. +func deprecatedDescriptor() *descpb.TableDescriptor { + uniqueRowIDString := "unique_rowid()" + nowString := "now():::TIMESTAMP" + pk := func(name string) descpb.IndexDescriptor { + return descpb.IndexDescriptor{ + Name: tabledesc.PrimaryKeyIndexName, + ID: 1, + Unique: true, + KeyColumnNames: []string{name}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{1}, + } + } + + return &descpb.TableDescriptor{ + Name: "jobs", + ID: keys.JobsTableID, + ParentID: keys.SystemDatabaseID, + UnexposedParentSchemaID: keys.PublicSchemaID, + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "id", ID: 1, Type: types.Int, DefaultExpr: &uniqueRowIDString}, + {Name: "status", ID: 2, Type: types.String}, + {Name: "created", ID: 3, Type: types.Timestamp, DefaultExpr: &nowString}, + {Name: "payload", ID: 4, Type: types.Bytes}, + {Name: "progress", ID: 5, Type: types.Bytes, Nullable: true}, + {Name: "created_by_type", ID: 6, Type: types.String, Nullable: true}, + {Name: "created_by_id", ID: 7, Type: types.Int, Nullable: true}, + {Name: "claim_session_id", ID: 8, Type: types.Bytes, Nullable: true}, + {Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true}, + }, + NextColumnID: 10, + Families: []descpb.ColumnFamilyDescriptor{ + { + Name: "fam_0_id_status_created_payload", + ID: 0, + ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7}, + }, + { + Name: "progress", + ID: 1, + ColumnNames: []string{"progress"}, + ColumnIDs: []descpb.ColumnID{5}, + DefaultColumnID: 5, + }, + { + Name: "claim", + ID: 2, + ColumnNames: []string{"claim_session_id", "claim_instance_id"}, + ColumnIDs: []descpb.ColumnID{8, 9}, + }, + }, + NextFamilyID: 3, + PrimaryIndex: pk("id"), + Indexes: []descpb.IndexDescriptor{ + { + Name: "jobs_status_created_idx", + ID: 2, + Unique: false, + KeyColumnNames: []string{"status", "created"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{2, 3}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + }, + { + Name: "jobs_created_by_type_created_by_id_idx", + ID: 3, + Unique: false, + KeyColumnNames: []string{"created_by_type", "created_by_id"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{6, 7}, + StoreColumnIDs: []descpb.ColumnID{2}, + StoreColumnNames: []string{"status"}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + }, + }, + NextIndexID: 4, + Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor( + descpb.SystemAllowedPrivileges[keys.JobsTableID], security.NodeUserName()), + FormatVersion: descpb.InterleavedFormatVersion, + NextMutationID: 1, + } +} diff --git a/pkg/migration/tenant_migration.go b/pkg/migration/tenant_migration.go index c92477be1f97..d4a2943b4192 100644 --- a/pkg/migration/tenant_migration.go +++ b/pkg/migration/tenant_migration.go @@ -31,6 +31,7 @@ type TenantDeps struct { Settings *cluster.Settings LeaseManager *lease.Manager InternalExecutor sqlutil.InternalExecutor + TestingKnobs *TestingKnobs } // TenantMigrationFunc is used to perform sql-level migrations. It may be run from diff --git a/pkg/migration/migrationmanager/testing_knobs.go b/pkg/migration/testing_knobs.go similarity index 50% rename from pkg/migration/migrationmanager/testing_knobs.go rename to pkg/migration/testing_knobs.go index 669aec7b1123..bfe6566b01ab 100644 --- a/pkg/migration/migrationmanager/testing_knobs.go +++ b/pkg/migration/testing_knobs.go @@ -8,12 +8,12 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package migrationmanager +package migration import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" ) // TestingKnobs are knobs to inject behavior into the migration manager which @@ -26,7 +26,24 @@ type TestingKnobs struct { ListBetweenOverride func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion // RegistryOverride is used to inject migrations for specific cluster versions. - RegistryOverride func(cv clusterversion.ClusterVersion) (migration.Migration, bool) + RegistryOverride func(cv clusterversion.ClusterVersion) (Migration, bool) + + // BeforeWaitInRetryJobsWithExponentialBackoffMigration is called before + // waiting for a mutation job to complete in retryJobsWithExponentialBackoff + // migration. + // TODO(sajjad): Remove this knob when the related migration code is removed. + // This knob is used only in exponential backoff migration and related tests. See + // pkg/migration/retry_jobs_with_exponential_backoff.go and + // pkg/migration/retry_jobs_with_exponential_backoff_external_test.go + BeforeWaitInRetryJobsWithExponentialBackoffMigration func(jobspb.JobID) + + // SkippedMutation is called if a mutation job is skipped as part of the + // retryJobsWithExponentialBackoff migration. + // TODO(sajjad): Remove this knob when the related migration code is removed. + // This knob is used only in exponential backoff migration and related tests. See + // pkg/migration/retry_jobs_with_exponential_backoff.go and + // pkg/migration/retry_jobs_with_exponential_backoff_external_test.go + SkippedMutation func() } // ModuleTestingKnobs makes TestingKnobs a base.ModuleTestingKnobs. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 9f8948af433d..7f64e5b7b9c6 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -732,12 +732,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { c = migrationcluster.NewTenantCluster(cfg.db) } - knobs, _ := cfg.TestingKnobs.MigrationManager.(*migrationmanager.TestingKnobs) + knobs, _ := cfg.TestingKnobs.MigrationManager.(*migration.TestingKnobs) migrationMgr := migrationmanager.NewManager( c, cfg.circularInternalExecutor, jobRegistry, codec, cfg.Settings, knobs, ) execCfg.MigrationJobDeps = migrationMgr execCfg.VersionUpgradeHook = migrationMgr.Migrate + execCfg.MigrationTestingKnobs = knobs } temporaryObjectCleaner := sql.NewTemporaryObjectCleaner( diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 7ec6bc07a947..6cb441b9caa3 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -166,6 +166,9 @@ CREATE TABLE system.ui ( "lastUpdated" TIMESTAMP NOT NULL );` + // JobsRunStatsIdxPredicate is the predicate in jobs_run_stats_idx in JobsTable. + JobsRunStatsIdxPredicate = `status IN ('running':::STRING, 'reverting':::STRING, 'pending':::STRING, 'pause-requested':::STRING, 'cancel-requested':::STRING)` + // Note: this schema is changed in a migration (a progress column is added in // a separate family). // NB: main column family uses old, pre created_by_type/created_by_id columns, named. @@ -181,12 +184,19 @@ CREATE TABLE system.jobs ( created_by_id INT, claim_session_id BYTES, claim_instance_id INT8, + num_runs INT8, + last_run TIMESTAMP, INDEX (status, created), INDEX (created_by_type, created_by_id) STORING (status), - + INDEX jobs_run_stats_idx ( + claim_session_id, + status, + created + ) STORING(last_run, num_runs, claim_instance_id) + WHERE ` + JobsRunStatsIdxPredicate + `, FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), FAMILY progress (progress), - FAMILY claim (claim_session_id, claim_instance_id) + FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run) );` // web_sessions are used to track authenticated user actions over stateless @@ -965,8 +975,7 @@ var ( NextMutationID: 1, }) - nowString = "now():::TIMESTAMP" - nowTZString = "now():::TIMESTAMPTZ" + nowString = "now():::TIMESTAMP" // JobsTable is the descriptor for the jobs table. JobsTable = makeTable(descpb.TableDescriptor{ @@ -985,8 +994,10 @@ var ( {Name: "created_by_id", ID: 7, Type: types.Int, Nullable: true}, {Name: "claim_session_id", ID: 8, Type: types.Bytes, Nullable: true}, {Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true}, + {Name: "num_runs", ID: 10, Type: types.Int, Nullable: true}, + {Name: "last_run", ID: 11, Type: types.Timestamp, Nullable: true}, }, - NextColumnID: 10, + NextColumnID: 12, Families: []descpb.ColumnFamilyDescriptor{ { // NB: We are using family name that existed prior to adding created_by_type and @@ -1007,8 +1018,8 @@ var ( { Name: "claim", ID: 2, - ColumnNames: []string{"claim_session_id", "claim_instance_id"}, - ColumnIDs: []descpb.ColumnID{8, 9}, + ColumnNames: []string{"claim_session_id", "claim_instance_id", "num_runs", "last_run"}, + ColumnIDs: []descpb.ColumnID{8, 9, 10, 11}, }, }, NextFamilyID: 3, @@ -1036,8 +1047,21 @@ var ( KeySuffixColumnIDs: []descpb.ColumnID{1}, Version: descpb.StrictIndexColumnIDGuaranteesVersion, }, + { + Name: "jobs_run_stats_idx", + ID: 4, + Unique: false, + KeyColumnNames: []string{"claim_session_id", "status", "created"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{8, 2, 3}, + StoreColumnNames: []string{"last_run", "num_runs", "claim_instance_id"}, + StoreColumnIDs: []descpb.ColumnID{11, 10, 9}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + Predicate: JobsRunStatsIdxPredicate, + }, }, - NextIndexID: 4, + NextIndexID: 5, Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor( descpb.SystemAllowedPrivileges[keys.JobsTableID], security.NodeUserName()), FormatVersion: descpb.InterleavedFormatVersion, @@ -1732,6 +1756,8 @@ var ( NextMutationID: 1, }) + nowTZString = "now():::TIMESTAMPTZ" + // ScheduledJobsTable is the descriptor for the scheduled jobs table. ScheduledJobsTable = makeTable(descpb.TableDescriptor{ Name: "scheduled_jobs", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 1bde7d8036c3..3d6c8122f7ed 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -906,6 +906,7 @@ type ExecutorConfig struct { FeatureFlagMetrics *featureflag.DenialMetrics TestingKnobs ExecutorTestingKnobs + MigrationTestingKnobs *migration.TestingKnobs PGWireTestingKnobs *PGWireTestingKnobs SchemaChangerTestingKnobs *SchemaChangerTestingKnobs NewSchemaChangerTestingKnobs *scexec.NewSchemaChangerTestingKnobs diff --git a/pkg/sql/lex/BUILD.bazel b/pkg/sql/lex/BUILD.bazel index c416990e4b1f..c5523afe3b6a 100644 --- a/pkg/sql/lex/BUILD.bazel +++ b/pkg/sql/lex/BUILD.bazel @@ -16,7 +16,6 @@ go_library( go_test( name = "lex_test", - size = "small", srcs = ["encode_test.go"], deps = [ ":lex", diff --git a/pkg/sql/logictest/BUILD.bazel b/pkg/sql/logictest/BUILD.bazel index 49b0f517f57f..061d03b3266d 100644 --- a/pkg/sql/logictest/BUILD.bazel +++ b/pkg/sql/logictest/BUILD.bazel @@ -16,7 +16,7 @@ go_library( "//pkg/build", "//pkg/clusterversion", "//pkg/kv/kvserver", - "//pkg/migration/migrationmanager", + "//pkg/migration", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/server", diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index b9f267dd0649..f7a1a4d546b6 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -42,7 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/migration/migrationmanager" + "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" @@ -1414,9 +1414,9 @@ func (t *logicTest) newCluster(serverArgs TestServerArgs) { from := clusterversion.ClusterVersion{Version: cfg.bootstrapVersion} to := clusterversion.ClusterVersion{Version: cfg.binaryVersion} if len(clusterversion.ListBetween(from, to)) == 0 { - mm, ok := nodeParams.Knobs.MigrationManager.(*migrationmanager.TestingKnobs) + mm, ok := nodeParams.Knobs.MigrationManager.(*migration.TestingKnobs) if !ok { - mm = &migrationmanager.TestingKnobs{} + mm = &migration.TestingKnobs{} nodeParams.Knobs.MigrationManager = mm } mm.ListBetweenOverride = func( diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index e8248aed93eb..804fe8f1c8e2 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -1939,6 +1939,8 @@ system public jobs created system public jobs created_by_id 7 system public jobs created_by_type 6 system public jobs id 1 +system public jobs last_run 11 +system public jobs num_runs 10 system public jobs payload 4 system public jobs progress 5 system public jobs status 2 diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 0bb9275b747c..440aeeca9e6e 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -1053,7 +1053,6 @@ indexrelid indrelid indnatts indisunique indisprimary indisexclusion indim 3752917847 27 2 true true false true false true false false true false 1 2 0 0 0 0 2 2 NULL NULL 2 3966258450 14 1 true true false true false true false false true false 1 3403232968 0 2 NULL NULL 1 4012654114 30 3 true true false true false true false false true false 1 2 3 0 0 3403232968 0 0 0 2 2 2 NULL NULL 3 -4133203393 45 2 true true false true false true false false true false 1 2 0 0 0 0 2 2 NULL NULL 2 4225994721 13 2 true true false true false true false false true false 1 7 0 0 0 0 2 2 NULL NULL 2 # From #26504 @@ -1146,8 +1145,6 @@ indexrelid operator_argument_type_oid operator_argument_position 4012654114 0 1 4012654114 0 2 4012654114 0 3 -4133203393 0 1 -4133203393 0 2 4225994721 0 1 4225994721 0 2 diff --git a/pkg/sql/logictest/testdata/logic_test/system b/pkg/sql/logictest/testdata/logic_test/system index a1355bacab53..a2e01e7ac3cd 100644 --- a/pkg/sql/logictest/testdata/logic_test/system +++ b/pkg/sql/logictest/testdata/logic_test/system @@ -163,15 +163,17 @@ lastUpdated TIMESTAMP false NULL · {primary} false query TTBTTTB SHOW COLUMNS FROM system.jobs ---- -id INT8 false unique_rowid() · {jobs_created_by_type_created_by_id_idx,jobs_status_created_idx,primary} false -status STRING false NULL · {jobs_created_by_type_created_by_id_idx,jobs_status_created_idx,primary} false -created TIMESTAMP false now():::TIMESTAMP · {jobs_status_created_idx,primary} false -payload BYTES false NULL · {primary} false -progress BYTES true NULL · {primary} false -created_by_type STRING true NULL · {jobs_created_by_type_created_by_id_idx,primary} false -created_by_id INT8 true NULL · {jobs_created_by_type_created_by_id_idx,primary} false -claim_session_id BYTES true NULL · {primary} false -claim_instance_id INT8 true NULL · {primary} false +id INT8 false unique_rowid() · {jobs_created_by_type_created_by_id_idx,jobs_run_stats_idx,jobs_status_created_idx,primary} false +status STRING false NULL · {jobs_created_by_type_created_by_id_idx,jobs_run_stats_idx,jobs_status_created_idx,primary} false +created TIMESTAMP false now():::TIMESTAMP · {jobs_run_stats_idx,jobs_status_created_idx,primary} false +payload BYTES false NULL · {primary} false +progress BYTES true NULL · {primary} false +created_by_type STRING true NULL · {jobs_created_by_type_created_by_id_idx,primary} false +created_by_id INT8 true NULL · {jobs_created_by_type_created_by_id_idx,primary} false +claim_session_id BYTES true NULL · {jobs_run_stats_idx,primary} false +claim_instance_id INT8 true NULL · {jobs_run_stats_idx,primary} false +num_runs INT8 true NULL · {jobs_run_stats_idx,primary} false +last_run TIMESTAMP true NULL · {jobs_run_stats_idx,primary} false query TTBTTTB SHOW COLUMNS FROM system.settings diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 7de59398935f..5d505189641d 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2442,7 +2442,6 @@ var charts = []sectionDescription{ { Title: "Round", Metrics: []string{ - "schedules.round.schedules-ready-to-run", "schedules.round.reschedule-skip", "schedules.round.reschedule-wait", @@ -2641,6 +2640,20 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{Jobs, "Registry"}}, + Charts: []chartDescription{ + { + Title: "Jobs Registry Stats", + Metrics: []string{ + "jobs.adopt_iterations", + "jobs.claimed_jobs", + "jobs.resumed_claimed_jobs", + }, + AxisLabel: "Count", + }, + }, + }, { Organization: [][]string{{Process, "Node", "Admission"}}, Charts: []chartDescription{