Skip to content

Commit

Permalink
jobs: break up new stages of job lifecycle movement
Browse files Browse the repository at this point in the history
In the PR which adopted the sqlliveness sessions, we shoved all of the stages
of adopting jobs into the same stage and we invoked that stage on each adoption
interval and on each sent to the adoption channel.

These stages are:

 * Cancel jobs
 * Serve pause and cancel requests
 * Delete claims due to dead sessions
 * Claim jobs
 * Process claimed jobs

This is problematic for tests which send on the adoption channel at a high
rate. One important thing to note is that all jobs which are sent on the
adoption channel are already claimed.

After this PR we move the first three steps above into the cancellation
loop we were already running. We also increase the default interval for
that loop as it was exceedingly frequent at 1s for no obvious reason.

Much of the testing changes are due to this cancelation loop duration
change. The tests in this package now run 3x faster (10s vs 30s).

Then, upon sends on the adoption channel, we just process claimed jobs.
When the adoption interval rolls around, then we attempt to both claim
and process jobs.

Release justification: bug fixes and low-risk updates to new functionality
Release note: None
  • Loading branch information
ajwerner committed Aug 28, 2020
1 parent 2f5c6b8 commit 1281a3c
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 250 deletions.
12 changes: 2 additions & 10 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,11 +1334,7 @@ func createAndWaitForJob(
func TestBackupRestoreResume(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

ctx := context.Background()

Expand Down Expand Up @@ -1478,11 +1474,7 @@ func TestBackupRestoreControlJob(t *testing.T) {

// force every call to update
defer jobs.TestingSetProgressThresholds()()

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

serverArgs := base.TestServerArgs{}
// Disable external processing of mutations so that the final check of
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/backupccl/restore_mid_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ func restoreMidSchemaChange(backupDir, schemaChangeName string) func(t *testing.

return func(t *testing.T) {
params := base.TestServerArgs{}
defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond

defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

const numAccounts = 1000
_, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitNone, base.TestClusterArgs{ServerArgs: params})
Expand Down
27 changes: 7 additions & 20 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,8 +2014,7 @@ func TestChangefeedPauseUnpause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -2074,9 +2073,7 @@ func TestChangefeedPauseUnpause(t *testing.T) {
func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -2135,9 +2132,7 @@ func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) {
func TestChangefeedProtectedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

var (
ctx = context.Background()
Expand Down Expand Up @@ -2303,9 +2298,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) {
t.Run(`enterprise`, enterpriseTest(func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
Expand Down Expand Up @@ -2384,8 +2377,7 @@ func TestChangefeedProtectedTimestampsVerificationFails(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

verifyRequestCh := make(chan *roachpb.AdminVerifyProtectedTimestampRequest, 1)
requestFilter := kvserverbase.ReplicaRequestFilter(func(
Expand Down Expand Up @@ -2520,10 +2512,7 @@ func TestChangefeedNodeShutdown(t *testing.T) {
defer log.Scope(t).Close(t)
skip.WithIssue(t, 32232)

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

flushCh := make(chan struct{}, 1)
defer close(flushCh)
Expand Down Expand Up @@ -2685,9 +2674,7 @@ func TestChangefeedMemBufferCapacity(t *testing.T) {
func TestChangefeedRestartDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
func TestChangefeedNemeses(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
// TODO(dan): Ugly hack to disable `eventPause` in sinkless feeds. See comment in
Expand Down
22 changes: 4 additions & 18 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4025,10 +4025,7 @@ func TestImportControlJob(t *testing.T) {

skip.WithIssue(t, 51792, "TODO(dt): add knob to force faster progress checks.")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

var serverArgs base.TestServerArgs
// Disable external processing of mutations so that the final check of
Expand Down Expand Up @@ -4290,10 +4287,7 @@ func TestImportWorkerFailure(t *testing.T) {
// node down are detected and retried.
skip.WithIssue(t, 51793, "flaky due to undetected kinds of failures when the node is shutdown")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

allowResponse := make(chan struct{})
params := base.TestClusterArgs{}
Expand Down Expand Up @@ -4371,11 +4365,7 @@ func TestImportLivenessWithRestart(t *testing.T) {
skip.WithIssue(t, 51794, "TODO(dt): this relies on chunking done by prior version of IMPORT."+
"Rework this test, or replace it with resume-tests + jobs infra tests.")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
jobs.DefaultCancelInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

const nodes = 1
nl := jobs.NewFakeNodeLiveness(nodes)
Expand Down Expand Up @@ -4506,11 +4496,7 @@ func TestImportLivenessWithLeniency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond
jobs.DefaultCancelInterval = 100 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

const nodes = 1
nl := jobs.NewFakeNodeLiveness(nodes)
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ func (r *Registry) deprecatedCancelAllLocked(ctx context.Context) {
log.Warningf(ctx, "job %d: canceling due to liveness failure", jobID)
cancel()
}
r.mu.deprecatedJobs = make(map[int64]context.CancelFunc)
for jobID := range r.mu.deprecatedJobs {
delete(r.mu.deprecatedJobs, jobID)
}
}

// deprecatedRegister registers an about to be resumed job in memory so that it can be
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
func TestInlineExecutorFailedJobsHandling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer TestingSetAdoptAndCancelIntervals(time.Millisecond, time.Microsecond)()
h, cleanup := newTestHelper(t)
defer cleanup()

Expand Down
54 changes: 27 additions & 27 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,17 @@ type counters struct {
}

type registryTestSuite struct {
ctx context.Context
oldInterval time.Duration
s serverutils.TestServerInterface
outerDB *gosql.DB
sqlDB *sqlutils.SQLRunner
registry *jobs.Registry
done chan struct{}
mockJob jobs.Record
job *jobs.Job
mu struct {
ctx context.Context
oldAdoptInterval time.Duration
oldCancelInterval time.Duration
s serverutils.TestServerInterface
outerDB *gosql.DB
sqlDB *sqlutils.SQLRunner
registry *jobs.Registry
done chan struct{}
mockJob jobs.Record
job *jobs.Job
mu struct {
syncutil.Mutex
a counters
e counters
Expand All @@ -179,8 +180,10 @@ func noopPauseRequestFunc(
}

func (rts *registryTestSuite) setUp(t *testing.T) {
rts.oldInterval = jobs.DefaultAdoptInterval
rts.oldAdoptInterval = jobs.DefaultAdoptInterval
rts.oldCancelInterval = jobs.DefaultCancelInterval
jobs.DefaultAdoptInterval = time.Millisecond
jobs.DefaultCancelInterval = 2 * time.Millisecond
rts.ctx = context.Background()
rts.s, rts.outerDB, _ = serverutils.StartServer(t, base.TestServerArgs{})
rts.sqlDB = sqlutils.MakeSQLRunner(rts.outerDB)
Expand Down Expand Up @@ -270,7 +273,8 @@ func (rts *registryTestSuite) tearDown() {
close(rts.resumeCheckCh)
close(rts.done)
rts.s.Stopper().Stop(rts.ctx)
jobs.DefaultAdoptInterval = rts.oldInterval
jobs.DefaultAdoptInterval = rts.oldAdoptInterval
jobs.DefaultCancelInterval = rts.oldCancelInterval
jobs.ResetConstructors()()
}

Expand Down Expand Up @@ -306,6 +310,7 @@ func (rts *registryTestSuite) check(t *testing.T, expectedStatus jobs.Status) {
func TestRegistryLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

t.Run("normal success", func(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
Expand Down Expand Up @@ -1390,6 +1395,7 @@ func TestShowJobs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()
params, _ := tests.CreateTestServerParams()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
Expand Down Expand Up @@ -1780,10 +1786,8 @@ func TestShowJobWhenComplete(t *testing.T) {
defer log.Scope(t).Close(t)
// Canceling a job relies on adopt daemon to move the job to state
// reverting.
defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -1919,10 +1923,8 @@ func TestJobInTxn(t *testing.T) {
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 5 * time.Second
// Set the adoption interval to be very long to test the adoption channel.
defer jobs.TestingSetAdoptAndCancelIntervals(time.Hour, time.Hour)()

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down Expand Up @@ -2274,10 +2276,7 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()
defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

ctx := context.Background()

Expand Down Expand Up @@ -2308,9 +2307,10 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) {
select {
case <-resuming:
t.Fatal("job was resumed")
case <-time.After(time.Second):
// With an adopt interval of 10 ms, within 1 s we can be reasonably sure
// that the job was not adopted.
case <-time.After(100 * time.Millisecond):
// With an adopt interval of 10 ms, within 100ms we can be reasonably sure
// that the job was not adopted. At the very least, the test would be
// flakey.
}
})

Expand Down
Loading

0 comments on commit 1281a3c

Please sign in to comment.