Skip to content

Commit

Permalink
jobs: rename Resume to Unpause
Browse files Browse the repository at this point in the history
Resume verb is already used to for the action of starting a job
after adoption on a node. Here instead Resume is the opposite
action of pausing a job.

Release note: none.
  • Loading branch information
Spas Bojanov committed Jun 8, 2020
1 parent 5689836 commit a1028aa
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ func TestCSVImportCanBeResumed(t *testing.T) {
resumePos := js.prog.ResumePos[0]
t.Logf("Resume pos: %v\n", js.prog.ResumePos[0])

// Resume the job and wait for it to complete.
if err := registry.Resume(ctx, nil, jobID); err != nil {
// Unpause the job and wait for it to complete.
if err := registry.Unpause(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusSucceeded == js.status })
Expand Down Expand Up @@ -763,8 +763,8 @@ func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
// Send cancellation and unblock import.
proceedImport()

// Resume the job and wait for it to complete.
if err := registry.Resume(ctx, nil, jobID); err != nil {
// Unpause the job and wait for it to complete.
if err := registry.Unpause(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusSucceeded == js.status })
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ func (j *Job) paused(ctx context.Context, fn func(context.Context, *kv.Txn) erro
})
}

// resumed sets the status of the tracked job to running or reverting iff the
// unpaused sets the status of the tracked job to running or reverting iff the
// job is currently paused. It does not directly resume the job; rather, it
// expires the job's lease so that a Registry adoption loop detects it and
// resumes it.
func (j *Job) resumed(ctx context.Context) error {
func (j *Job) unpaused(ctx context.Context) error {
return j.Update(ctx, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Status == StatusRunning || md.Status == StatusReverting {
// Already resumed - do nothing.
Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,11 @@ func TestJobLifecycle(t *testing.T) {
if err := exp.verify(job.ID(), jobs.StatusPaused); err != nil {
t.Fatal(err)
}
if err := registry.Resume(ctx, nil, *job.ID()); err != nil {
if err := registry.Unpause(ctx, nil, *job.ID()); err != nil {
t.Fatal(err)
}
// Resume the job again to ensure that the resumption is idempotent.
if err := registry.Resume(ctx, nil, *job.ID()); err != nil {
if err := registry.Unpause(ctx, nil, *job.ID()); err != nil {
t.Fatal(err)
}
if err := exp.verify(job.ID(), jobs.StatusRunning); err != nil {
Expand Down Expand Up @@ -1172,7 +1172,7 @@ func TestJobLifecycle(t *testing.T) {
if err := registry.CancelRequested(ctx, nil, *job.ID()); err != nil {
t.Fatal(err)
}
if err := registry.Resume(ctx, nil, *job.ID()); !testutils.IsError(err, "cannot be resumed") {
if err := registry.Unpause(ctx, nil, *job.ID()); !testutils.IsError(err, "cannot be resumed") {
t.Errorf("got unexpected status '%v'", err)
}
}
Expand All @@ -1183,7 +1183,7 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}
expectedErr := fmt.Sprintf("job with status %s cannot be resumed", jobs.StatusSucceeded)
if err := registry.Resume(ctx, nil, *job.ID()); !testutils.IsError(err, expectedErr) {
if err := registry.Unpause(ctx, nil, *job.ID()); !testutils.IsError(err, expectedErr) {
t.Errorf("expected '%s', but got '%v'", expectedErr, err)
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,14 @@ func (r *Registry) Failed(ctx context.Context, txn *kv.Txn, id int64, causingErr
return job.WithTxn(txn).failed(ctx, causingError, nil)
}

// Resume resumes the paused job with id using the specified txn (may be nil).
func (r *Registry) Resume(ctx context.Context, txn *kv.Txn, id int64) error {
// Unpause changes the paused job with id to running or reverting using the
// specified txn (may be nil).
func (r *Registry) Unpause(ctx context.Context, txn *kv.Txn, id int64) error {
job, _, err := r.getJobFn(ctx, txn, id)
if err != nil {
return err
}
return job.WithTxn(txn).resumed(ctx)
return job.WithTxn(txn).unpaused(ctx)
}

// Resumer is a resumable job, and is associated with a Job object. Jobs can be
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/control_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (n *controlJobsNode) startExec(params runParams) error {
case jobs.StatusPaused:
err = reg.PauseRequested(params.ctx, params.p.txn, int64(jobID))
case jobs.StatusRunning:
err = reg.Resume(params.ctx, params.p.txn, int64(jobID))
err = reg.Unpause(params.ctx, params.p.txn, int64(jobID))
case jobs.StatusCanceled:
err = reg.CancelRequested(params.ctx, params.p.txn, int64(jobID))
default:
Expand Down

0 comments on commit a1028aa

Please sign in to comment.