Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown panics by separating completer context #401

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix possible Client shutdown panics if the user-provided context is cancelled while jobs are still running. [PR #401](https://github.com/riverqueue/river/pull/401).

## [0.7.0] - 2024-06-13

### Added
Expand Down
19 changes: 10 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ type Client[TTx any] struct {
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
Expand Down Expand Up @@ -670,9 +669,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(c.completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh)
completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh)

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
Expand All @@ -694,11 +693,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {

// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
// started with a context that's _not_ cancelled if the user-provided
// context is cancelled. This ensures that even when fetch is cancelled on
// shutdown, the completer is still given a separate opportunity to start
// stopping only after the producers have finished up and returned.
if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil {
stopServicesOnError()
return err
}
Expand Down Expand Up @@ -744,7 +743,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
<-fetchCtx.Done()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
stopProducers()
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped")

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
Expand Down
25 changes: 25 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,31 @@ func Test_Client_Stop(t *testing.T) {
})
}

func Test_Client_Stop_AfterContextCancelled(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// doneCh will never close, job will exit due to context cancellation:
doneCh := make(chan struct{})
startedCh := make(chan int64)

dbPool := riverinternaltest.TestDB(ctx, t)
client := newTestClient(t, dbPool, newTestConfig(t, makeAwaitCallback(startedCh, doneCh)))
require.NoError(t, client.Start(ctx))
t.Cleanup(func() { require.NoError(t, client.Stop(context.Background())) })

insertRes, err := client.Insert(ctx, callbackArgs{}, nil)
require.NoError(t, err)
startedJobID := riverinternaltest.WaitOrTimeout(t, startedCh)
require.Equal(t, insertRes.Job.ID, startedJobID)

cancel()

require.ErrorIs(t, client.Stop(ctx), context.Canceled)
}

func Test_Client_StopAndCancel(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch)
jobRows []*rivertype.JobRow
)
c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch))
if len(setStateBatch) > c.completionMaxSize {
jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch))
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
Expand Down
6 changes: 6 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (p *producer) Start(ctx context.Context) error {
return p.StartWorkContext(ctx, ctx)
}

func (p *producer) Stop() {
p.Logger.Debug(p.Name + ": Stopping")
p.BaseStartStop.Stop()
p.Logger.Debug(p.Name + ": Stop returned")
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
// context is cancelled or Stop is invoked.
//
Expand Down
Loading