Skip to content

Commit

Permalink
jobs: change TestingCreateAndStartJob from a registry method to a fun…
Browse files Browse the repository at this point in the history
…ction

Release note: None
  • Loading branch information
thoszhang committed Feb 23, 2021
1 parent ffe4a45 commit 91152f0
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4716,7 +4716,7 @@ func TestImportControlJobRBAC(t *testing.T) {
})

startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob {
job, err := registry.TestingCreateAndStartJob(ctx, nil, record)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), record)
require.NoError(t, err)
return job
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStreamIngestionJobRollBack(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
j, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord)
j, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), streamIngestJobRecord)
require.NoError(t, err)

// Insert more data in the table. These changes should be rollback during job
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCutoverBuiltin(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
job, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), streamIngestJobRecord)
require.NoError(t, err)

// Check that sentinel is not set.
Expand Down
52 changes: 26 additions & 26 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand All @@ -338,7 +338,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand All @@ -359,7 +359,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand All @@ -458,7 +458,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand All @@ -484,7 +484,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
job, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.setUp(t)
defer rts.tearDown()

j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestRegistryLifecycle(t *testing.T) {

// Make marking success fail.
rts.successErr = errors.New("injected failure at marking as succeeded")
j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -723,7 +723,7 @@ func TestRegistryLifecycle(t *testing.T) {

// Make marking success fail.
rts.successErr = errors.New("resume failed")
j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -759,7 +759,7 @@ func TestRegistryLifecycle(t *testing.T) {
return nil
}

job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
job, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
require.NoError(t, err)
rts.job = job

Expand Down Expand Up @@ -794,7 +794,7 @@ func TestRegistryLifecycle(t *testing.T) {
return errors.New("boom")
}

job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob)
job, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
require.NoError(t, err)
rts.job = job

Expand Down Expand Up @@ -872,7 +872,7 @@ func TestJobLifecycle(t *testing.T) {

startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) {
beforeTime := timeutil.Now()
job, err := registry.TestingCreateAndStartJob(ctx, nil, record)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1838,7 +1838,7 @@ func TestShowJobWhenComplete(t *testing.T) {
t.Run("show job", func(t *testing.T) {
// Start a job and cancel it so it is in state finished and then query it with
// SHOW JOB WHEN COMPLETE.
job, err := registry.TestingCreateAndStartJob(ctx, nil, mockJob)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), mockJob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1874,23 +1874,23 @@ func TestShowJobWhenComplete(t *testing.T) {
t.Run("show jobs", func(t *testing.T) {
// Start two jobs and cancel the first one to make sure the
// query still blocks until the second job is also canceled.
var jobs [2]*jobs.StartableJob
for i := range jobs {
job, err := registry.TestingCreateAndStartJob(ctx, nil, mockJob)
var jobsToStart [2]*jobs.StartableJob
for i := range jobsToStart {
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), mockJob)
if err != nil {
t.Fatal(err)
}
jobs[i] = job
jobsToStart[i] = job
}
if _, err := db.ExecContext(ctx, "CANCEL JOB $1", *jobs[0].ID()); err != nil {
if _, err := db.ExecContext(ctx, "CANCEL JOB $1", *jobsToStart[0].ID()); err != nil {
t.Fatal(err)
}
group := ctxgroup.WithContext(ctx)
group.GoCtx(func(ctx context.Context) error {
rows, err := db.QueryContext(ctx,
`SELECT job_id, status
FROM [SHOW JOBS WHEN COMPLETE (SELECT $1 UNION SELECT $2)]`,
*jobs[0].ID(), *jobs[1].ID())
*jobsToStart[0].ID(), *jobsToStart[1].ID())
if err != nil {
return err
}
Expand All @@ -1901,8 +1901,8 @@ func TestShowJobWhenComplete(t *testing.T) {
}
cnt += 1
switch out.id {
case *jobs[0].ID():
case *jobs[1].ID():
case *jobsToStart[0].ID():
case *jobsToStart[1].ID():
// SHOW JOBS WHEN COMPLETE finishes only after all jobs are
// canceled.
if out.status != "canceled" {
Expand All @@ -1913,7 +1913,7 @@ func TestShowJobWhenComplete(t *testing.T) {
default:
return errors.Errorf(
"Expected either id:%d or id:%d but got: %d",
*jobs[0].ID(), *jobs[1].ID(), out.id)
*jobsToStart[0].ID(), *jobsToStart[1].ID(), out.id)
}
}
if cnt != 2 {
Expand All @@ -1925,7 +1925,7 @@ func TestShowJobWhenComplete(t *testing.T) {
// SHOW JOBS WHEN COMPLETE does block until the job is canceled.
time.Sleep(2 * time.Millisecond)
var err error
if _, err = db.ExecContext(ctx, "CANCEL JOB $1", *jobs[1].ID()); err == nil {
if _, err = db.ExecContext(ctx, "CANCEL JOB $1", *jobsToStart[1].ID()); err == nil {
err = group.Wait()
}
if err != nil {
Expand Down
52 changes: 26 additions & 26 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,32 +286,6 @@ func (r *Registry) MakeJobID() int64 {
return int64(builtins.GenerateUniqueInt(r.nodeID.SQLInstanceID()))
}

// TestingCreateAndStartJob creates and asynchronously starts a job from record.
// An error is returned if the job type has not been registered with
// RegisterConstructor. The ctx passed to this function is not the context the
// job will be started with (canceling ctx will not cause the job to cancel).
func (r *Registry) TestingCreateAndStartJob(
ctx context.Context, resultsCh chan<- tree.Datums, record Record,
) (*StartableJob, error) {
var rj *StartableJob
jobID := r.MakeJobID()
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
return r.CreateStartableJobWithTxn(ctx, &rj, jobID, txn, record)
}); err != nil {
if rj != nil {
if cleanupErr := rj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
}
}
return nil, err
}
err := rj.Start(ctx)
if err != nil {
return nil, err
}
return rj, nil
}

// NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs.
func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
select {
Expand Down Expand Up @@ -1366,3 +1340,29 @@ func (r *Registry) unregister(jobID int64) {
func (r *Registry) TestingNudgeAdoptionQueue() {
r.adoptionCh <- claimAndResumeClaimedJobs
}

// TestingCreateAndStartJob creates and asynchronously starts a job from record.
// An error is returned if the job type has not been registered with
// RegisterConstructor. The ctx passed to this function is not the context the
// job will be started with (canceling ctx will not cause the job to cancel).
func TestingCreateAndStartJob(
ctx context.Context, r *Registry, db *kv.DB, record Record,
) (*StartableJob, error) {
var rj *StartableJob
jobID := r.MakeJobID()
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
return r.CreateStartableJobWithTxn(ctx, &rj, jobID, txn, record)
}); err != nil {
if rj != nil {
if cleanupErr := rj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
}
}
return nil, err
}
err := rj.Start(ctx)
if err != nil {
return nil, err
}
return rj, nil
}
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
}
job, err := newRegistry(nodeid).TestingCreateAndStartJob(ctx, nil, rec)
job, err := jobs.TestingCreateAndStartJob(ctx, newRegistry(nodeid), db, rec)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/gcjob_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/gcjob",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand Down
15 changes: 5 additions & 10 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -198,8 +197,7 @@ func TestSchemaChangeGCJob(t *testing.T) {
Details: details,
}

resultsCh := make(chan tree.Datums)
job, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, jobRecord)
job, err := jobs.TestingCreateAndStartJob(ctx, jobRegistry, kvDB, jobRecord)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -349,7 +347,7 @@ func TestGCResumer(t *testing.T) {
gcjob.SetSmallMaxGCIntervalForTest()

ctx := context.Background()
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
srv, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
execCfg := srv.ExecutorConfig().(sql.ExecutorConfig)
jobRegistry := execCfg.JobRegistry
defer srv.Stopper().Stop(ctx)
Expand All @@ -366,8 +364,7 @@ func TestGCResumer(t *testing.T) {
Progress: jobspb.SchemaChangeGCProgress{},
}

resultsCh := make(chan tree.Datums)
sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record)
sj, err := jobs.TestingCreateAndStartJob(ctx, jobRegistry, kvDB, record)
require.NoError(t, err)
require.NoError(t, sj.AwaitCompletion(ctx))
job, err := jobRegistry.LoadJob(ctx, *sj.ID())
Expand All @@ -393,8 +390,7 @@ func TestGCResumer(t *testing.T) {
Progress: jobspb.SchemaChangeGCProgress{},
}

resultsCh := make(chan tree.Datums)
sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record)
sj, err := jobs.TestingCreateAndStartJob(ctx, jobRegistry, kvDB, record)
require.NoError(t, err)

_, err = sqlDB.Exec("ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
Expand Down Expand Up @@ -428,8 +424,7 @@ func TestGCResumer(t *testing.T) {
Progress: jobspb.SchemaChangeGCProgress{},
}

resultsCh := make(chan tree.Datums)
sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record)
sj, err := jobs.TestingCreateAndStartJob(ctx, jobRegistry, kvDB, record)
require.NoError(t, err)
require.Error(t, sj.AwaitCompletion(ctx))
})
Expand Down

0 comments on commit 91152f0

Please sign in to comment.