Skip to content

Commit

Permalink
Expose Config.TestOnly configuration option for disabling staggered…
Browse files Browse the repository at this point in the history
… start

This one's a continuation of #385. Maintenance services have a staggered
start feature that causes them to sleep for a random amount of jittered
time on startup so they don't all try to work simultaneously. This is
useful in production, but somewhat harmful in tests because it makes
start and stop slower and thereby integration test cases slower.

River has an internal flag that allows staggered start to be disabled in
its own test suite, but external users of River have no way to access
this functionality.

Here, introduce `Config.TestOnly` that can be provided to client
configuration in test suites regardless of whether the caller is
internal or not.

This differs slightly from #385 in that it provides only a boolean, with
the idea being that if we find it useful to disable other features for
tests in the future, a boolean keeps third party code for forwards
compatible in that they get these disabled automatically. In case it
does become important to distinguish between individual features at some
later time, I figure we can add an additional `TestOnlyConfig` property
that allows full configuration beyond defaults.
  • Loading branch information
brandur committed Jul 3, 2024
1 parent f0eb3cf commit 70a34c8
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `Config.TestOnly` has been added. It disables various features in the River client like staggered maintenance service start that are useful in production, but may be somewhat harmful in tests because they make start/stop slower. [PR #414](https://github.com/riverqueue/river/pull/414).

### Fixed

- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408).
Expand Down
25 changes: 11 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,16 @@ type Config struct {
// Defaults to DefaultRetryPolicy.
RetryPolicy ClientRetryPolicy

// TestOnly holds configuration for test-only settings that should generally
// not be used outside of test suites.
TestOnly TestOnlyConfig
// TestOnly can be set to true to disable certain features that are useful
// in production, but which may be harmful to tests, in ways like having the
// effect of making them slower. It should not be used outside of test
// suites.
//
// For example, queue maintenance services normally stagger their startup
// with a random jittered sleep so they don't all try to work at the same
// time. This is nice in production, but makes starting and stopping the
// client in a test case slower.
TestOnly bool

// Workers is a bundle of registered job workers.
//
Expand Down Expand Up @@ -293,16 +300,6 @@ type QueueConfig struct {
MaxWorkers int
}

// TestOnlyConfig contains test-only settings that should generally not be used
// outside of test suites.
type TestOnlyConfig struct {
// DisableStaggerStart disables the normal random jittered sleep occurring in
// queue maintenance services to stagger their startup so they don't all try
// to work at the same time. Appropriate for use in tests to make sure that
// the client can always be started and stopped again hastily.
DisableStaggerStart bool
}

// Client is a single isolated instance of River. Your application may use
// multiple instances operating on different databases or Postgres schemas
// within a single database.
Expand Down Expand Up @@ -615,7 +612,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// started conditionally based on whether the client is the leader.
client.queueMaintainer = maintenance.NewQueueMaintainer(archetype, maintenanceServices)

if config.TestOnly.DisableStaggerStart {
if config.TestOnly {
client.queueMaintainer.StaggerStartupDisable(true)
}
}
Expand Down
10 changes: 6 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Logger: riverinternaltest.Logger(t),
MaxAttempts: MaxAttemptsDefault,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}},
TestOnly: TestOnlyConfig{DisableStaggerStart: true}, // disables staggered start in maintenance services
TestOnly: true, // disables staggered start in maintenance services
Workers: workers,
schedulerInterval: riverinternaltest.SchedulerShortInterval,
time: &riverinternaltest.TimeStub{},
Expand Down Expand Up @@ -3957,9 +3957,11 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.False(t, jobCleaner.StaggerStartupIsDisabled())

enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
require.Zero(t, enqueuer.Config.AdvisoryLockPrefix)
require.False(t, enqueuer.StaggerStartupIsDisabled())

require.Nil(t, client.config.ErrorHandler)
require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown)
Expand All @@ -3968,7 +3970,6 @@ func Test_NewClient_Defaults(t *testing.T) {
require.NotZero(t, client.baseService.Logger)
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.False(t, client.config.TestOnly.DisableStaggerStart)
}

func Test_NewClient_Overrides(t *testing.T) {
Expand Down Expand Up @@ -3998,7 +3999,7 @@ func Test_NewClient_Overrides(t *testing.T) {
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
TestOnly: TestOnlyConfig{DisableStaggerStart: true}, // disables staggered start in maintenance services
TestOnly: true, // disables staggered start in maintenance services
Workers: workers,
})
require.NoError(t, err)
Expand All @@ -4009,9 +4010,11 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, 2*time.Hour, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, 3*time.Hour, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.True(t, jobCleaner.StaggerStartupIsDisabled())

enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
require.Equal(t, int32(123_456), enqueuer.Config.AdvisoryLockPrefix)
require.True(t, enqueuer.StaggerStartupIsDisabled())

require.Equal(t, errorHandler, client.config.ErrorHandler)
require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown)
Expand All @@ -4020,7 +4023,6 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, logger, client.baseService.Logger)
require.Equal(t, 5, client.config.MaxAttempts)
require.Equal(t, retryPolicy, client.config.RetryPolicy)
require.True(t, client.config.TestOnly.DisableStaggerStart)
}

func Test_NewClient_MissingParameters(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions internal/maintenance/queue_maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (s *queueMaintainerServiceBase) StaggerStartupDisable(disabled bool) {
s.staggerStartupDisabled = disabled
}

func (s *queueMaintainerServiceBase) StaggerStartupIsDisabled() bool {
return s.staggerStartupDisabled
}

// withStaggerStartupDisable is an interface to a service whose stagger startup
// sleep can be disable.
type withStaggerStartupDisable interface {
Expand Down

0 comments on commit 70a34c8

Please sign in to comment.