Skip to content

Commit

Permalink
Fix shutdown panics by separating completer context (#401)
Browse files Browse the repository at this point in the history
* add failing test case for completer shutdown panic

This failing test case exposes the issue in #400 100% of the time, which
is caused by the `stopProducers()` call not actually waiting until the
producers are fully shut down before proceeding with the remaining
shutdown.

* fix shutdown panics by separating completer context

Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.

Fixes #400.
  • Loading branch information
bgentry authored Jun 26, 2024
1 parent 220a636 commit 6c6e868
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix possible Client shutdown panics if the user-provided context is cancelled while jobs are still running. [PR #401](https://github.com/riverqueue/river/pull/401).

## [0.7.0] - 2024-06-13

### Added
Expand Down
19 changes: 10 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ type Client[TTx any] struct {
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
Expand Down Expand Up @@ -670,9 +669,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(c.completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh)
completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh)

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
Expand All @@ -694,11 +693,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {

// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
// started with a context that's _not_ cancelled if the user-provided
// context is cancelled. This ensures that even when fetch is cancelled on
// shutdown, the completer is still given a separate opportunity to start
// stopping only after the producers have finished up and returned.
if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil {
stopServicesOnError()
return err
}
Expand Down Expand Up @@ -744,7 +743,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
<-fetchCtx.Done()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
stopProducers()
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped")

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
Expand Down
25 changes: 25 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,31 @@ func Test_Client_Stop(t *testing.T) {
})
}

func Test_Client_Stop_AfterContextCancelled(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// doneCh will never close, job will exit due to context cancellation:
doneCh := make(chan struct{})
startedCh := make(chan int64)

dbPool := riverinternaltest.TestDB(ctx, t)
client := newTestClient(t, dbPool, newTestConfig(t, makeAwaitCallback(startedCh, doneCh)))
require.NoError(t, client.Start(ctx))
t.Cleanup(func() { require.NoError(t, client.Stop(context.Background())) })

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
require.NoError(t, err)
startedJobID := riverinternaltest.WaitOrTimeout(t, startedCh)
require.Equal(t, insertRes.Job.ID, startedJobID)

cancel()

require.ErrorIs(t, client.Stop(ctx), context.Canceled)
}

func Test_Client_StopAndCancel(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch)
jobRows []*rivertype.JobRow
)
c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch))
if len(setStateBatch) > c.completionMaxSize {
jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch))
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
Expand Down
6 changes: 6 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (p *producer) Start(ctx context.Context) error {
return p.StartWorkContext(ctx, ctx)
}

func (p *producer) Stop() {
p.Logger.Debug(p.Name + ": Stopping")
p.BaseStartStop.Stop()
p.Logger.Debug(p.Name + ": Stop returned")
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
// context is cancelled or Stop is invoked.
//
Expand Down

0 comments on commit 6c6e868

Please sign in to comment.