diff --git a/CHANGELOG.md b/CHANGELOG.md index b860912c..dfb4e414 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix possible Client shutdown panics if the user-provided context is cancelled while jobs are still running. [PR #401](https://github.com/riverqueue/river/pull/401). + ## [0.7.0] - 2024-06-13 ### Added diff --git a/client.go b/client.go index d1d897ee..b9cdfd8d 100644 --- a/client.go +++ b/client.go @@ -305,7 +305,6 @@ type Client[TTx any] struct { baseStartStop startstop.BaseStartStop completer jobcompleter.JobCompleter - completerSubscribeCh chan []jobcompleter.CompleterJobUpdated config *Config driver riverdriver.Driver[TTx] elector *leadership.Elector @@ -670,9 +669,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // Each time we start, we need a fresh completer subscribe channel to // send job completion events on, because the completer will close it // each time it shuts down. - c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10) - c.completer.ResetSubscribeChan(c.completerSubscribeCh) - c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh) + completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10) + c.completer.ResetSubscribeChan(completerSubscribeCh) + c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh) // In case of error, stop any services that might have started. This // is safe because even services that were never started will still @@ -694,11 +693,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // The completer is part of the services list below, but although it can // stop gracefully along with all the other services, it needs to be - // started with a context that's _not_ fetchCtx. This ensures that even - // when fetch is cancelled on shutdown, the completer is still given a - // separate opportunity to start stopping only after the producers have - // finished up and returned. - if err := c.completer.Start(ctx); err != nil { + // started with a context that's _not_ cancelled if the user-provided + // context is cancelled. This ensures that even when fetch is cancelled on + // shutdown, the completer is still given a separate opportunity to start + // stopping only after the producers have finished up and returned. + if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil { stopServicesOnError() return err } @@ -744,7 +743,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { <-fetchCtx.Done() // On stop, have the producers stop fetching first of all. + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers") stopProducers() + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped") // Stop all mainline services where stop order isn't important. startstop.StopAllParallel(append( diff --git a/client_test.go b/client_test.go index 392d3c00..b7815f26 100644 --- a/client_test.go +++ b/client_test.go @@ -836,6 +836,31 @@ func Test_Client_Stop(t *testing.T) { }) } +func Test_Client_Stop_AfterContextCancelled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // doneCh will never close, job will exit due to context cancellation: + doneCh := make(chan struct{}) + startedCh := make(chan int64) + + dbPool := riverinternaltest.TestDB(ctx, t) + client := newTestClient(t, dbPool, newTestConfig(t, makeAwaitCallback(startedCh, doneCh))) + require.NoError(t, client.Start(ctx)) + t.Cleanup(func() { require.NoError(t, client.Stop(context.Background())) }) + + insertRes, err := client.Insert(ctx, callbackArgs{}, nil) + require.NoError(t, err) + startedJobID := riverinternaltest.WaitOrTimeout(t, startedCh) + require.Equal(t, insertRes.Job.ID, startedJobID) + + cancel() + + require.ErrorIs(t, client.Stop(ctx), context.Canceled) +} + func Test_Client_StopAndCancel(t *testing.T) { t.Parallel() diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 5ad7a5a0..44f98967 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -393,6 +393,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch) jobRows []*rivertype.JobRow ) + c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch)) if len(setStateBatch) > c.completionMaxSize { jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) for i := 0; i < len(setStateBatch); i += c.completionMaxSize { diff --git a/producer.go b/producer.go index 27928aef..0ac6cd7d 100644 --- a/producer.go +++ b/producer.go @@ -215,6 +215,12 @@ func (p *producer) Start(ctx context.Context) error { return p.StartWorkContext(ctx, ctx) } +func (p *producer) Stop() { + p.Logger.Debug(p.Name + ": Stopping") + p.BaseStartStop.Stop() + p.Logger.Debug(p.Name + ": Stop returned") +} + // Start starts the producer. It backgrounds a goroutine which is stopped when // context is cancelled or Stop is invoked. //